diff --git a/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java b/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java index ca27345..ab662b7 100644 --- a/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java +++ b/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java @@ -1,7 +1,5 @@ package it.cavallium.strangedb.database; -import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO; -import it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata; import it.cavallium.strangedb.database.references.DatabaseReferencesIO; import it.cavallium.strangedb.database.references.DatabaseReferencesMetadata; import it.cavallium.strangedb.database.references.ReferenceInfo; @@ -17,33 +15,24 @@ import static it.cavallium.strangedb.database.references.DatabaseReferencesMetad public class DatabaseCore implements IDatabase { private final DatabaseFileIO fileIO; - private final DatabaseBlocksIO blocksIO; - protected final DatabaseBlocksMetadata blocksMetadata; protected final DatabaseReferencesIO referencesIO; protected final DatabaseReferencesMetadata referencesMetadata; protected final Path dataFile; - protected final Path blocksMetaFile; protected final Path referencesMetaFile; protected volatile boolean closed; - public DatabaseCore(Path dataFile, Path blocksMetaFile, Path referencesMetaFile) throws IOException { + public DatabaseCore(Path dataFile, Path referencesMetaFile) throws IOException { if (Files.notExists(dataFile)) { Files.createFile(dataFile); } - if (Files.notExists(blocksMetaFile)) { - Files.createFile(blocksMetaFile); - } if (Files.notExists(referencesMetaFile)) { Files.createFile(referencesMetaFile); } this.dataFile = dataFile; - this.blocksMetaFile = blocksMetaFile; this.referencesMetaFile = referencesMetaFile; this.fileIO = new DatabaseFileIO(dataFile); - this.blocksMetadata = new DatabaseBlocksMetadata(blocksMetaFile); - this.blocksIO = new DatabaseBlocksIO(fileIO, blocksMetadata); this.referencesMetadata = new DatabaseReferencesMetadata(referencesMetaFile); - this.referencesIO = new DatabaseReferencesIO(blocksIO, referencesMetadata); + this.referencesIO = new DatabaseReferencesIO(fileIO, referencesMetadata); } @Override @@ -52,7 +41,6 @@ public class DatabaseCore implements IDatabase { throw new IOException("The database has been already closed!"); } this.referencesMetadata.close(); - this.blocksMetadata.close(); this.fileIO.close(); this.closed = true; } @@ -68,45 +56,35 @@ public class DatabaseCore implements IDatabase { this.close(); } Path newDataFile = dataFile.resolveSibling("compressed-data-file.tmp"); - Path newBlocksFile = blocksMetaFile.resolveSibling("compressed-blocks-file.tmp"); Path newReferencesFile = referencesMetaFile.resolveSibling("compressed-references-file.tmp"); Path backupDataFile = dataFile.resolveSibling("backup-data.db.bak"); - Path backupBlocksFile = blocksMetaFile.resolveSibling("backup-blocks.dat.bak"); Path backupReferencesFile = referencesMetaFile.resolveSibling("backup-references.dat.bak"); Files.copy(dataFile, backupDataFile, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.COPY_ATTRIBUTES); - Files.copy(blocksMetaFile, backupBlocksFile, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.COPY_ATTRIBUTES); Files.copy(referencesMetaFile, backupReferencesFile, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.COPY_ATTRIBUTES); Files.move(dataFile, newDataFile, StandardCopyOption.REPLACE_EXISTING); - Files.move(blocksMetaFile, newBlocksFile, StandardCopyOption.REPLACE_EXISTING); Files.move(referencesMetaFile, newReferencesFile, StandardCopyOption.REPLACE_EXISTING); - DatabaseCore databaseToClean = new DatabaseCore(newDataFile, newBlocksFile, newReferencesFile); - DatabaseCore newDatabase = new DatabaseCore(dataFile, blocksMetaFile, referencesMetaFile); + DatabaseCore databaseToClean = new DatabaseCore(newDataFile, newReferencesFile); + DatabaseCore newDatabase = new DatabaseCore(dataFile, referencesMetaFile); long referencesCount = databaseToClean.referencesMetadata.getFirstFreeReference(); - long blocksCount = databaseToClean.blocksMetadata.getTotalBlocksCount(); long writtenReferences = 0; - long writtenBlocks = 0; for (int referenceID = 0; referenceID < referencesCount; referenceID++) { try { - ReferenceInfo ref = databaseToClean.referencesMetadata.getCleanReference(referenceID); + ReferenceInfo ref = databaseToClean.referencesMetadata.getReferenceInfo(referenceID); if (!NONEXISTENT_REFERENCE_INFO.equals(ref)) { ByteBuffer buffer = databaseToClean.referencesIO.readFromReference(referenceID); newDatabase.referencesIO.writeToReference(referenceID, ref.getCleanerId(), buffer.limit(), buffer); writtenReferences++; - if (buffer.limit() > 0) { - writtenBlocks++; - } } } catch (IOException ex) { System.out.println("Error while reading reference " + referenceID + ". References written: " + writtenReferences); } } - System.out.println("[Core Cleaner] References written: " + writtenReferences + ". Removed " + (blocksCount - writtenBlocks) + " blocks. Removed " + (referencesCount - writtenReferences) + " references."); + System.out.println("[Core Cleaner] References written: " + writtenReferences + ". Removed " + (referencesCount - writtenReferences) + " references."); databaseToClean.close(); newDatabase.close(); Files.deleteIfExists(newDataFile); - Files.deleteIfExists(newBlocksFile); Files.deleteIfExists(newReferencesFile); } } diff --git a/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java b/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java index 0870a05..c739bd8 100644 --- a/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java +++ b/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java @@ -6,13 +6,15 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class DatabaseFileIO implements IFileIO { private final AsynchronousFileChannel dataFileChannel; private final AtomicLong firstFreeIndex; + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private volatile boolean closed = false; public DatabaseFileIO(Path dataFile) throws IOException { @@ -22,26 +24,36 @@ public class DatabaseFileIO implements IFileIO { @Override public ByteBuffer readAt(long index, int length) throws IOException { - if (closed) throw new IOException("Database closed!"); - ByteBuffer dataBuffer = ByteBuffer.allocate(length); + lock.readLock().lock(); try { - dataFileChannel.read(dataBuffer, index).get(); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); + if (closed) throw new IOException("Database closed!"); + ByteBuffer dataBuffer = ByteBuffer.allocate(length); + try { + dataFileChannel.read(dataBuffer, index).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + dataBuffer.flip(); + return dataBuffer; + } finally { + lock.readLock().unlock(); } - dataBuffer.flip(); - return dataBuffer; } @Override public int writeAt(long index, int length, ByteBuffer data) throws IOException { - if (closed) throw new IOException("Database closed!"); - return writeAt_(index, length, data); + lock.writeLock().lock(); + try { + return writeAt_(index, length, data); + } finally { + lock.writeLock().unlock(); + } } private int writeAt_(long index, int length, ByteBuffer data) throws IOException { + if (closed) throw new IOException("Database closed!"); if (data.position() != 0) { throw new IOException("You didn't flip the ByteBuffer!"); } @@ -57,16 +69,26 @@ public class DatabaseFileIO implements IFileIO { @Override public long writeAtEnd(int length, ByteBuffer data) throws IOException { - if (closed) throw new IOException("Database closed!"); - long index = firstFreeIndex.getAndAdd(length); - writeAt_(index, length, data); - return index; + lock.writeLock().lock(); + try { + if (closed) throw new IOException("Database closed!"); + long index = firstFreeIndex.getAndAdd(length); + writeAt_(index, length, data); + return index; + } finally { + lock.writeLock().unlock(); + } } @Override public void close() throws IOException { - if (closed) throw new IOException("Database already closed!"); - closed = true; - dataFileChannel.close(); + lock.writeLock().lock(); + try { + if (closed) throw new IOException("Database already closed!"); + closed = true; + dataFileChannel.close(); + } finally { + lock.writeLock().unlock(); + } } } diff --git a/src/main/java/it/cavallium/strangedb/database/IBlocksMetadata.java b/src/main/java/it/cavallium/strangedb/database/IBlocksMetadata.java deleted file mode 100644 index 04f4c5d..0000000 --- a/src/main/java/it/cavallium/strangedb/database/IBlocksMetadata.java +++ /dev/null @@ -1,53 +0,0 @@ -package it.cavallium.strangedb.database; - -import it.cavallium.strangedb.database.blocks.BlockInfo; - -import java.io.IOException; - -public interface IBlocksMetadata { - long EMPTY_BLOCK_ID = -1; - long ERROR_BLOCK_ID = -2; - BlockInfo EMPTY_BLOCK_INFO = new BlockInfo(0, 0); - - /** - * Get block info - * @param blockId block id - * @return block metadata - */ - BlockInfo getBlockInfo(long blockId) throws IOException; - - /** - * New empty block info - * @return block id - */ - default long newBlock() { - return EMPTY_BLOCK_ID; - } - - /** - * Set block info - * @param index block index - * @param size block size - * @return block id - */ - long newBlock(long index, int size) throws IOException; - - /** - * Set block info - * @param blockInfo block info - * @return block id - */ - default long newBlock(BlockInfo blockInfo) throws IOException { - return this.newBlock(blockInfo.getIndex(), blockInfo.getSize()); - } - /** - * Close file - */ - void close() throws IOException; - - /** - * Get total count of blocks - * @return - */ - long getTotalBlocksCount(); -} diff --git a/src/main/java/it/cavallium/strangedb/database/IDatabase.java b/src/main/java/it/cavallium/strangedb/database/IDatabase.java index 2ab0647..fec0a5c 100644 --- a/src/main/java/it/cavallium/strangedb/database/IDatabase.java +++ b/src/main/java/it/cavallium/strangedb/database/IDatabase.java @@ -4,7 +4,7 @@ import java.io.IOException; public interface IDatabase { - int DISK_BLOCK_SIZE = 4096; + int DISK_BLOCK_SIZE = 2*4096; void close() throws IOException; diff --git a/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java b/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java index 0229718..4009c92 100644 --- a/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java +++ b/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java @@ -5,35 +5,30 @@ import it.cavallium.strangedb.database.references.ReferenceInfo; import java.io.IOException; public interface IReferencesMetadata { - /** - * Get block of reference - * @param reference reference - * @return block id - */ - long getReferenceBlockId(long reference) throws IOException; /** * Get reference info * @param reference reference * @return reference info + * @throws IOException */ - ReferenceInfo getCleanReference(long reference) throws IOException; + ReferenceInfo getReferenceInfo(long reference) throws IOException; /** - * Allocate a block for a new reference + * Allocate a new reference * - * @param blockId block id + * @param index index + * @param size size * @return reference */ - long newReference(long blockId) throws IOException; + long newReference(long index, int size) throws IOException; /** * Change reference * @param reference reference - * @param cleanerId cleaner id - * @param blockId block id + * @param info info */ - void editReference(long reference, byte cleanerId, long blockId) throws IOException; + void editReference(long reference, ReferenceInfo info) throws IOException; /** * Delete reference diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/BlockInfo.java b/src/main/java/it/cavallium/strangedb/database/blocks/BlockInfo.java deleted file mode 100644 index bf2c9c0..0000000 --- a/src/main/java/it/cavallium/strangedb/database/blocks/BlockInfo.java +++ /dev/null @@ -1,44 +0,0 @@ -package it.cavallium.strangedb.database.blocks; - -import java.util.Objects; -import java.util.StringJoiner; - -public class BlockInfo { - private final long index; - private final int size; - - public BlockInfo(long index, int size) { - this.index = index; - this.size = size; - } - - public long getIndex() { - return index; - } - - public int getSize() { - return size; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - BlockInfo blockInfo = (BlockInfo) o; - return index == blockInfo.index && - size == blockInfo.size; - } - - @Override - public int hashCode() { - return Objects.hash(index, size); - } - - @Override - public String toString() { - return new StringJoiner(", ", BlockInfo.class.getSimpleName() + "[", "]") - .add("index=" + index) - .add("size=" + size) - .toString(); - } -} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java deleted file mode 100644 index 2440579..0000000 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java +++ /dev/null @@ -1,80 +0,0 @@ -package it.cavallium.strangedb.database.blocks; - -import it.cavallium.strangedb.database.DatabaseFileIO; -import it.cavallium.strangedb.database.IBlocksIO; -import it.cavallium.strangedb.database.IBlocksMetadata; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; -import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; - -public class DatabaseBlocksIO implements IBlocksIO { - - private final DatabaseFileIO fileIO; - private final IBlocksMetadata blocksMetadata; - - public DatabaseBlocksIO(DatabaseFileIO fileIO, IBlocksMetadata blocksMetadata) { - this.fileIO = fileIO; - this.blocksMetadata = blocksMetadata; - } - - @Override - public long newBlock(int size, ByteBuffer data) throws IOException { - if (size == 0) { - return EMPTY_BLOCK_ID; - } - if (size < 0) { - throw new IOException("Trying to create a block with size " + size); - } - if (data.limit() < size) { - throw new IOException("Trying to create a block with size " + size + " but with a buffer of size " + data.limit()); - } - long index = fileIO.writeAtEnd(size, data); - return blocksMetadata.newBlock(index, size); - } - - @Override - public ByteBuffer readBlock(long blockId) throws IOException { - if (blockId == EMPTY_BLOCK_ID) { - return ByteBuffer.wrap(new byte[0]); - } - if (blockId == ERROR_BLOCK_ID) { - throw new IOException("Errored block id"); - } - if (blockId < 0) { - throw new IOException("Block id " + blockId + " is not valid"); - } - BlockInfo blockInfo = blocksMetadata.getBlockInfo(blockId); - return fileIO.readAt(blockInfo.getIndex(), blockInfo.getSize()); - } - - public ByteBuffer readBlockSizeAndLastElementOfReferencesList(long blockId) throws IOException { - if (blockId == EMPTY_BLOCK_ID) { - return ByteBuffer.wrap(new byte[0]); - } - if (blockId == ERROR_BLOCK_ID) { - throw new IOException("Errored block id"); - } - if (blockId < 0) { - throw new IOException("Block id " + blockId + " is not valid"); - } - BlockInfo blockInfo = blocksMetadata.getBlockInfo(blockId); - if (blockInfo.getSize() >= Integer.BYTES * 2 + Long.BYTES) { - return fileIO.readAt(blockInfo.getIndex() + blockInfo.getSize() - (Integer.BYTES + Long.BYTES), Integer.BYTES + Long.BYTES); - } else { - return fileIO.readAt(blockInfo.getIndex(), blockInfo.getSize()); - } - } - - @Override - public void close() { - - } -} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java deleted file mode 100644 index 7a48e4c..0000000 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java +++ /dev/null @@ -1,128 +0,0 @@ -package it.cavallium.strangedb.database.blocks; - - -import it.cavallium.strangedb.database.IBlocksMetadata; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.AsynchronousFileChannel; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicLong; - -import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE; - -public class DatabaseBlocksMetadata implements IBlocksMetadata { - public static final BlockInfo ERROR_BLOCK_INFO = new BlockInfo(-2, 0); - private static final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES; - public static final int BLOCK_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % BLOCK_META_BYTES_COUNT) / BLOCK_META_BYTES_COUNT; - - private final AsynchronousFileChannel metaFileChannel; - private final DatabaseBlocksMetadataCache cache; - private AtomicLong firstFreeBlock; - - public DatabaseBlocksMetadata(Path metaFile) throws IOException { - metaFileChannel = AsynchronousFileChannel.open(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); - firstFreeBlock = new AtomicLong(metaFileChannel.size() / BLOCK_META_BYTES_COUNT); - this.cache = new DatabaseBlocksMetadataCache(this::writeBlockToDisk); - } - - @Override - public BlockInfo getBlockInfo(long blockId) throws IOException { - if (blockId == EMPTY_BLOCK_ID) { - return EMPTY_BLOCK_INFO; - } - if (blockId == ERROR_BLOCK_ID) { - throw new IOException("Errored block id"); - } - BlockInfo blockInfo; - if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) { - return blockInfo; - } - long position = blockId * BLOCK_META_BYTES_COUNT; - int size = BLOCK_META_READS_AT_EVERY_READ * BLOCK_META_BYTES_COUNT; - long currentFirstFreeBlock = this.firstFreeBlock.get(); - if (blockId > currentFirstFreeBlock) { - return EMPTY_BLOCK_INFO; - } - if (blockId + (size - 1) / BLOCK_META_BYTES_COUNT >= currentFirstFreeBlock) { - size = (int) ((currentFirstFreeBlock - blockId) * BLOCK_META_BYTES_COUNT); - } - int blocksCount = size / BLOCK_META_BYTES_COUNT; - - ByteBuffer buffer = ByteBuffer.allocate(size); - try { - metaFileChannel.read(buffer, position).get(); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } - buffer.flip(); - - if (blocksCount < 1) { - throw new IOException("Trying to read <1 blocks"); - } - if (buffer.limit() % BLOCK_META_BYTES_COUNT != 0 || buffer.limit() < BLOCK_META_BYTES_COUNT) { - throw new IOException("The buffer is smaller than the data requested."); - } else if (buffer.limit() != size) { - size = buffer.limit(); - blocksCount = size / BLOCK_META_BYTES_COUNT; - } - - long[] allBlockIds = new long[blocksCount]; - BlockInfo[] allBlockInfo = new BlockInfo[blocksCount]; - - blockInfo = EMPTY_BLOCK_INFO; - for (int delta = 0; delta < blocksCount; delta++) { - long blockToLoad = blockId + delta; - long blockIndex = buffer.getLong(); - int blockSize = buffer.getInt(); - BlockInfo currentBlockInfo = new BlockInfo(blockIndex, blockSize); - allBlockIds[delta] = blockToLoad; - allBlockInfo[delta] = currentBlockInfo; - if (blockToLoad == blockId) { - blockInfo = currentBlockInfo; - } - } - cache.putAll(allBlockIds, allBlockInfo); - return blockInfo; - } - - @Override - public long newBlock(long index, int size) throws IOException { - if (size == 0) { - return EMPTY_BLOCK_ID; - } - long newBlockId = firstFreeBlock.getAndIncrement(); - BlockInfo blockInfo = new BlockInfo(index, size); - cache.put(newBlockId, blockInfo); - return newBlockId; - } - - @Override - public void close() throws IOException { - cache.close(); - metaFileChannel.close(); - } - - private void writeBlockToDisk(long blockId, long index, int size) throws IOException { - ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); - data.putLong(index); - data.putInt(size); - data.flip(); - try { - metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT).get(); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } - } - - @Override - public long getTotalBlocksCount() { - return firstFreeBlock.get(); - } -} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java deleted file mode 100644 index 3a176b4..0000000 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java +++ /dev/null @@ -1,143 +0,0 @@ -package it.cavallium.strangedb.database.blocks; - -import it.unimi.dsi.fastutil.longs.*; -import it.unimi.dsi.fastutil.objects.ObjectArrayList; -import it.unimi.dsi.fastutil.objects.ObjectIterator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.*; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ; - -public class DatabaseBlocksMetadataCache { - - private static final int BASE_QUANTITY = (BLOCK_META_READS_AT_EVERY_READ < 500 ? BLOCK_META_READS_AT_EVERY_READ : 500); - private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY; - private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; - private static final int MAX_CACHE_SIZE = 400 * BLOCK_META_READS_AT_EVERY_READ; - - private final Long2ObjectMap blocks2Info = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f)); - - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); - private final DatabaseBlocksMetadataCacheFlusher flusher; - private volatile boolean closed; - ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "Blocks Flush Thread")); - - public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) { - this.flusher = flusher; - } - - public BlockInfo get(long block) throws IOException { - if (closed) throw new IOException("Cache already closed!"); - lock.readLock().lock(); - try { - return blocks2Info.getOrDefault(block, DatabaseBlocksMetadata.ERROR_BLOCK_INFO); - } finally { - lock.readLock().unlock(); - } - } - - public void put(long block, BlockInfo blockInfo) throws IOException { - if (closed) return; - lock.writeLock().lock(); - try { - blocks2Info.put(block, blockInfo); - flush(); - } finally { - lock.writeLock().unlock(); - } - } - - @SuppressWarnings("unchecked") - public void putAll(long[] blocks, BlockInfo[] blockInfos) throws IOException { - if (closed) return; - lock.writeLock().lock(); - try { - Long2ObjectMap blocksInfosToAdd = new Long2ObjectLinkedOpenHashMap<>(blocks, blockInfos, 0.5f); - blocks2Info.putAll(blocksInfosToAdd); - flush(); - } finally { - lock.writeLock().unlock(); - } - } - - private void flush() throws IOException { - if (closed) return; - int blocks2InfoSize = blocks2Info.size(); - if (blocks2InfoSize >= FLUSH_CACHE_SIZE) { - ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); - @SuppressWarnings("unchecked") - ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize - GOOD_CACHE_SIZE], blocks2InfoSize - GOOD_CACHE_SIZE); - for (int i = 0; i < blocks2InfoSize - GOOD_CACHE_SIZE; i++) { - Long2ObjectMap.Entry entry = entriesIterator.next(); - BlockInfo blockInfo = entry.getValue(); - long blockId = entry.getLongKey(); - long blockPosition = blockInfo.getIndex(); - int blockSize = blockInfo.getSize(); - entriesIterator.remove(); - - tasks.set(i, () -> { - try { - flusher.flush(blockId, blockPosition, blockSize); - } catch (IOException e) { - throw new CompletionException(e); - } - return null; - }); - } - try { - flushExecutorService.invokeAll(tasks); - } catch (InterruptedException e) { - throw new IOException(e.getCause()); - } - } - } - - public void close() throws IOException { - if (!closed) { - closed = true; - lock.writeLock().lock(); - try { - int blocks2InfoSize = blocks2Info.size(); - ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); - @SuppressWarnings("unchecked") - ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize], blocks2InfoSize); - for (int i = 0; i < blocks2InfoSize; i++) { - Long2ObjectMap.Entry entry = entriesIterator.next(); - BlockInfo blockInfo = entry.getValue(); - long blockId = entry.getLongKey(); - long blockPosition = blockInfo.getIndex(); - int blockSize = blockInfo.getSize(); - entriesIterator.remove(); - tasks.set(i, () -> { - try { - flusher.flush(blockId, blockPosition, blockSize); - } catch (IOException e) { - throw new CompletionException(e); - } - return null; - }); - } - try { - flushExecutorService.invokeAll(tasks); - } catch (InterruptedException e) { - throw new IOException(e.getCause()); - } - flushExecutorService.shutdown(); - try { - if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) - flushExecutorService.shutdownNow(); - } catch (InterruptedException e) { - throw new IOException(e); - } - } finally { - lock.writeLock().unlock(); - } - } - } -} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCacheFlusher.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCacheFlusher.java deleted file mode 100644 index d6d50dd..0000000 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCacheFlusher.java +++ /dev/null @@ -1,8 +0,0 @@ -package it.cavallium.strangedb.database.blocks; - -import java.io.IOException; -import java.util.concurrent.Future; - -public interface DatabaseBlocksMetadataCacheFlusher { - void flush(long key, long value1, int value2) throws IOException; -} diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java index 9af232c..c1d33ee 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java @@ -1,67 +1,69 @@ package it.cavallium.strangedb.database.references; +import it.cavallium.strangedb.database.DatabaseFileIO; import it.cavallium.strangedb.database.IReferencesIO; -import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; - public class DatabaseReferencesIO implements IReferencesIO { - private final DatabaseBlocksIO blocksIO; + private final DatabaseFileIO fileIO; private final DatabaseReferencesMetadata referencesMetadata; - private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); - public DatabaseReferencesIO(DatabaseBlocksIO blocksIO, DatabaseReferencesMetadata referencesMetadata) { - this.blocksIO = blocksIO; + public DatabaseReferencesIO(DatabaseFileIO fileIO, DatabaseReferencesMetadata referencesMetadata) { + this.fileIO = fileIO; this.referencesMetadata = referencesMetadata; } @Override public long allocateReference() throws IOException { - return referencesMetadata.newReference(EMPTY_BLOCK_ID); + return referencesMetadata.newReference(0, 0); } @Override public long allocateReference(int size, ByteBuffer data) throws IOException { - long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data); - return referencesMetadata.newReference(blockId); + long index = writeToFile(size, data); + return referencesMetadata.newReference(index, size); } @Override public void writeToReference(long reference, byte cleanerId, int size, ByteBuffer data) throws IOException { - long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data); - lock.writeLock().lock(); - try { - referencesMetadata.editReference(reference, cleanerId, blockId); - } finally { - lock.writeLock().unlock(); - } + long index = writeToFile(size, data); + referencesMetadata.editReference(reference, new ReferenceInfo(index, size, cleanerId)); } @Override public ByteBuffer readFromReference(long reference) throws IOException { - long blockId; - lock.readLock().lock(); - try { - blockId = referencesMetadata.getReferenceBlockId(reference); - } finally { - lock.readLock().unlock(); - } - return blocksIO.readBlock(blockId); + ReferenceInfo referenceInfo = referencesMetadata.getReferenceInfo(reference); + return fileIO.readAt(referenceInfo.getIndex(), referenceInfo.getSize()); } public ByteBuffer readFromReferenceSizeAndLastElementOfReferencesList(long reference) throws IOException { - long blockId; - lock.readLock().lock(); - try { - blockId = referencesMetadata.getReferenceBlockId(reference); - } finally { - lock.readLock().unlock(); + ReferenceInfo referenceInfo = referencesMetadata.getReferenceInfo(reference); + if (referenceInfo.getSize() >= Integer.BYTES * 2 + Long.BYTES) { + return fileIO.readAt(referenceInfo.getIndex() + referenceInfo.getSize() - (Integer.BYTES + Long.BYTES), Integer.BYTES + Long.BYTES); + } else { + return fileIO.readAt(referenceInfo.getIndex(), referenceInfo.getSize()); } - return blocksIO.readBlockSizeAndLastElementOfReferencesList(blockId); + } + + /** + * + * @param size + * @param data + * @return index + * @throws IOException + */ + private long writeToFile(int size, ByteBuffer data) throws IOException { + if (size == 0 && data == null) return 0; + if (size < 0) { + throw new IOException("Trying to create a block with size " + size); + } + if (data.limit() < size) { + throw new IOException("Trying to create a block with size " + size + " but with a buffer of size " + data.limit()); + } + return fileIO.writeAtEnd(size, data); } } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java index ce3c3ab..32bcd66 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java @@ -7,151 +7,125 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; -import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE; public class DatabaseReferencesMetadata implements IReferencesMetadata { public static final byte ERRORED_CLEANER = (byte) -1; public static final byte BLANK_DATA_CLEANER = (byte) -2; - public static final ReferenceInfo NONEXISTENT_REFERENCE_INFO = new ReferenceInfo(ERRORED_CLEANER, ERROR_BLOCK_ID); - private static final int REF_META_BYTES_COUNT = Long.BYTES + Byte.BYTES; + public static final ReferenceInfo NONEXISTENT_REFERENCE_INFO = new ReferenceInfo(-1, -1, ERRORED_CLEANER); + private static final int REF_META_BYTES_COUNT = Long.BYTES + Integer.BYTES + Byte.BYTES; public static final int REF_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % REF_META_BYTES_COUNT) / REF_META_BYTES_COUNT; private final AsynchronousFileChannel metaFileChannel; private final DatabaseReferencesMetadataCache cache; private AtomicLong firstFreeReference; - private AtomicLong lastWrittenReference; private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); public DatabaseReferencesMetadata(Path refMetaFile) throws IOException { metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); firstFreeReference = new AtomicLong(metaFileChannel.size() / REF_META_BYTES_COUNT); - lastWrittenReference = new AtomicLong(firstFreeReference.get() - 1); - this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk); + this.cache = new DatabaseReferencesMetadataCache(new Flusher()); } @Override - public long getReferenceBlockId(long reference) throws IOException { + public ReferenceInfo getReferenceInfo(long reference) throws IOException { + ReferenceInfo block; + long[] allReferences; + ReferenceInfo[] allInfo; lock.readLock().lock(); try { - return getReferenceBlockId_(reference); + long firstFreeReference = this.firstFreeReference.get(); + if (reference >= firstFreeReference) { + return NONEXISTENT_REFERENCE_INFO; + } + if ((block = cache.getInfo(reference)) != NONEXISTENT_REFERENCE_INFO) { + return block; + } + long position = reference * REF_META_BYTES_COUNT; + int size = REF_META_READS_AT_EVERY_READ * REF_META_BYTES_COUNT; + if (reference + (REF_META_READS_AT_EVERY_READ - 1) >= firstFreeReference) { + size = (int) ((firstFreeReference - reference) * REF_META_BYTES_COUNT); + } + int referencesCount = size / REF_META_BYTES_COUNT; + + ByteBuffer buffer = ByteBuffer.allocate(size); + try { + metaFileChannel.read(buffer, position).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + buffer.flip(); + + if (referencesCount < 1) { + throw new IOException("Trying to read <1 references"); + } + if (buffer.limit() % REF_META_BYTES_COUNT != 0 || buffer.limit() < REF_META_BYTES_COUNT) { + throw new IOException("The buffer is smaller than the data requested."); + } else if (buffer.limit() != size) { + size = buffer.limit(); + referencesCount = size / REF_META_BYTES_COUNT; + } + + allReferences = new long[referencesCount]; + allInfo = new ReferenceInfo[referencesCount]; + + block = NONEXISTENT_REFERENCE_INFO; + for (int delta = 0; delta < referencesCount; delta++) { + long referenceToLoad = reference + delta; + long currentIndex = buffer.getLong(); + int currentSize = buffer.getInt(); + byte cleanerId = buffer.get(); + ReferenceInfo refInfo = new ReferenceInfo(currentIndex, currentSize, cleanerId); + if (buffer.limit() != 0 && currentIndex != 0xFFFFFFFFFFFFFFFFL) { + allReferences[delta] = referenceToLoad; + allInfo[delta] = refInfo; + if (referenceToLoad == reference) { + block = refInfo; + } + } else { + allReferences[delta] = referenceToLoad; + allInfo[delta] = refInfo; + if (referenceToLoad == reference) { + block = NONEXISTENT_REFERENCE_INFO; + } + } + } } finally { lock.readLock().unlock(); } - } - - private long getReferenceBlockId_(long reference) throws IOException { - long firstFreeReference = this.firstFreeReference.get(); - if (reference >= firstFreeReference) { - return EMPTY_BLOCK_ID; - } - long block; - if ((block = cache.getBlock(reference)) != ERROR_BLOCK_ID) { - return block; - } - long position = reference * REF_META_BYTES_COUNT; - int size = REF_META_READS_AT_EVERY_READ * REF_META_BYTES_COUNT; - if (reference > firstFreeReference) { - return EMPTY_BLOCK_ID; - } - if (reference + (size - 1) / REF_META_BYTES_COUNT >= firstFreeReference) { - size = (int) ((firstFreeReference - reference) * REF_META_BYTES_COUNT); - } - int referencesCount = size / REF_META_BYTES_COUNT; - - ByteBuffer buffer = ByteBuffer.allocate(size); + lock.writeLock().lock(); try { - metaFileChannel.read(buffer, position).get(); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); + cache.putAll(allReferences, allInfo); + } finally { + lock.writeLock().unlock(); } - buffer.flip(); - - if (referencesCount < 1) { - throw new IOException("Trying to read <1 references"); - } - if (buffer.limit() % REF_META_BYTES_COUNT != 0 || buffer.limit() < REF_META_BYTES_COUNT) { - throw new IOException("The buffer is smaller than the data requested."); - } else if (buffer.limit() != size) { - size = buffer.limit(); - referencesCount = size / REF_META_BYTES_COUNT; - } - - long[] allReferences = new long[referencesCount]; - byte[] allCleaners = new byte[referencesCount]; - long[] allBlocks = new long[referencesCount]; - - block = EMPTY_BLOCK_ID; - for (int delta = 0; delta < referencesCount; delta++) { - long referenceToLoad = reference + delta; - long currentBlock = buffer.getLong(); - byte cleanerId = buffer.get(); - if (buffer.limit() != 0 && currentBlock != 0xFFFFFFFFFFFFFFFFL) { - allReferences[delta] = referenceToLoad; - allCleaners[delta] = cleanerId; - allBlocks[delta] = currentBlock; - if (referenceToLoad == reference) { - block = currentBlock; - } - } else { - allReferences[delta] = referenceToLoad; - allCleaners[delta] = cleanerId; - allBlocks[delta] = EMPTY_BLOCK_ID; - if (referenceToLoad == reference) { - block = EMPTY_BLOCK_ID; - } - } - } - cache.putAll(allReferences, allCleaners, allBlocks); return block; } - /** - * This method is SLOW! Use this only for the cleaner - * - * @param reference reference - * @return - * @throws IOException - */ - @Deprecated @Override - public ReferenceInfo getCleanReference(long reference) throws IOException { - lock.readLock().lock(); - try { - getReferenceBlockId_(reference); - return cache.get(reference); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public long newReference(long blockId) throws IOException { + public long newReference(long index, int size) throws IOException { lock.writeLock().lock(); long newReference; try { newReference = firstFreeReference.getAndIncrement(); + cache.put(newReference, new ReferenceInfo(index, size, BLANK_DATA_CLEANER), true); } finally { lock.writeLock().unlock(); } - cache.put(newReference, BLANK_DATA_CLEANER, blockId); return newReference; } @Override - public void editReference(long reference, byte cleanerId, long blockId) throws IOException { + public void editReference(long reference, ReferenceInfo info) throws IOException { lock.writeLock().lock(); try { - cache.put(reference, cleanerId, blockId); + cache.put(reference, info, true); } finally { lock.writeLock().unlock(); } @@ -161,7 +135,7 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { public void deleteReference(long reference) throws IOException { lock.writeLock().lock(); try { - cache.put(reference, NONEXISTENT_REFERENCE_INFO.getCleanerId(), NONEXISTENT_REFERENCE_INFO.getBlockId()); + cache.put(reference, NONEXISTENT_REFERENCE_INFO, true); } finally { lock.writeLock().unlock(); } @@ -188,20 +162,49 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { } } - private void writeReferenceToDisk(long reference, byte cleanerId, long blockId) throws IOException { - if (cleanerId == ERRORED_CLEANER) { - throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT"); + private class Flusher implements DatabaseReferencesMetadataCacheFlusher { + private Flusher() { + } - ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); - data.putLong(blockId); - data.put(cleanerId); - data.flip(); - try { - metaFileChannel.write(data, reference * REF_META_BYTES_COUNT).get(); - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); + + @Override + public void flush(long reference, ReferenceInfo info, boolean closing) throws IOException { + if (info.getCleanerId() == ERRORED_CLEANER) { + throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT"); + } + ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); + data.putLong(info.getIndex()); + data.putInt(info.getSize()); + data.put(info.getCleanerId()); + data.flip(); + try { + metaFileChannel.write(data, reference * REF_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + + @Override + public void flushMultiple(long referenceStart, ReferenceInfo[] infos) throws IOException { + ByteBuffer data = ByteBuffer.allocate(infos.length * REF_META_BYTES_COUNT); + for (ReferenceInfo info : infos) { + if (info.getCleanerId() == ERRORED_CLEANER) { + throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT"); + } + data.putLong(info.getIndex()); + data.putInt(info.getSize()); + data.put(info.getCleanerId()); + } + data.flip(); + try { + metaFileChannel.write(data, referenceStart * REF_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } } } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java index e474606..1865578 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java @@ -1,21 +1,16 @@ package it.cavallium.strangedb.database.references; -import it.cavallium.strangedb.VariableWrapper; import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectArrayList; import it.unimi.dsi.fastutil.objects.ObjectIterator; -import it.unimi.dsi.fastutil.objects.ObjectIterators; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; -import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ; import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO; import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.REF_META_READS_AT_EVERY_READ; @@ -23,11 +18,13 @@ public class DatabaseReferencesMetadataCache { private static final int BASE_QUANTITY = (REF_META_READS_AT_EVERY_READ < 500 ? REF_META_READS_AT_EVERY_READ : 500); private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY; - private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; + private static final int FLUSH_CACHE_SIZE = 300 * BASE_QUANTITY; private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY; + private static final int MODIFIED_FLUSH_CACHE_SIZE = 160 * BASE_QUANTITY; + private static final int MODIFIED_MAX_CACHE_SIZE = 260 * BASE_QUANTITY; - private final Long2LongMap references2Blocks = Long2LongMaps.synchronize(new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f)); - private final Long2ByteMap referencesCleaners = Long2ByteMaps.synchronize(new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f)); + private final Long2ObjectMap referencesInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f)); + private final Long2ObjectMap modifiedInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MODIFIED_MAX_CACHE_SIZE, 0.5f)); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final DatabaseReferencesMetadataCacheFlusher flusher; private volatile boolean closed; @@ -37,60 +34,50 @@ public class DatabaseReferencesMetadataCache { this.flusher = flusher; } - public long getBlock(long reference) throws IOException { + public ReferenceInfo getInfo(long reference) throws IOException { if (closed) throw new IOException("Cache already closed!"); lock.readLock().lock(); try { - return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); + return referencesInfos.getOrDefault(reference, NONEXISTENT_REFERENCE_INFO); } finally { lock.readLock().unlock(); } } - public ReferenceInfo get(long reference) throws IOException { - if (closed) throw new IOException("Cache already closed!"); - lock.readLock().lock(); - try { - long blockId = references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); - if (blockId == ERROR_BLOCK_ID) { - return NONEXISTENT_REFERENCE_INFO; - } - byte cleanerId = referencesCleaners.get(reference); - return new ReferenceInfo(cleanerId, blockId); - } finally { - lock.readLock().unlock(); - } - } - - public void put(long reference, byte cleanerId, long blockId) throws IOException { + public void put(long reference, ReferenceInfo info, boolean modified) throws IOException { if (closed) throw new IOException("Cache already closed!"); lock.writeLock().lock(); try { - if (cleanerId == 0) { + if (info.getCleanerId() == 0) { throw new IOException("Null cleaner id"); } - references2Blocks.put(reference, blockId); - referencesCleaners.put(reference, cleanerId); + referencesInfos.put(reference, info); + if (modified) { + modifiedInfos.put(reference, info); + } flush(); } finally { lock.writeLock().unlock(); } } - public void putAll(long[] references, byte[] cleanerIds, long[] blockIds) throws IOException { + /** + * Put all items in cache + * @param references + * @param infos + * @throws IOException + */ + public void putAll(long[] references, ReferenceInfo[] infos) throws IOException { if (closed) throw new IOException("Cache already closed!"); lock.writeLock().lock(); try { - Long2LongMap referencesBlocksToAdd = new Long2LongLinkedOpenHashMap(references, blockIds, 0.5f); - Long2ByteMap referencesCleanersToAdd = new Long2ByteLinkedOpenHashMap(references, cleanerIds, 0.5f); + Long2ObjectMap referencesBlocksToAdd = new Long2ObjectLinkedOpenHashMap<>(references.length, 0.5f); for (int i = 0; i < references.length; i++) { - if (cleanerIds[i] == 0) { - referencesBlocksToAdd.remove(references[i]); - referencesCleanersToAdd.remove(references[i]); + if (infos[i].getCleanerId() != 0) { + referencesBlocksToAdd.put(references[i], infos[i]); } } - references2Blocks.putAll(referencesBlocksToAdd); - referencesCleaners.putAll(referencesCleanersToAdd); + referencesInfos.putAll(referencesBlocksToAdd); flush(); } finally { lock.writeLock().unlock(); @@ -99,65 +86,79 @@ public class DatabaseReferencesMetadataCache { private void flush() throws IOException { if (closed) return; - int references2BlocksSize = references2Blocks.size(); - if (references2BlocksSize >= FLUSH_CACHE_SIZE) { - ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); - ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); - @SuppressWarnings("unchecked") - ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize - GOOD_CACHE_SIZE], references2BlocksSize - GOOD_CACHE_SIZE); - for (int i = 0; i < references2BlocksSize - GOOD_CACHE_SIZE; i++) { - Long2LongMap.Entry entry = entriesIterator.next(); - Long2ByteMap.Entry cleaner = cleanersIterator.next(); - long refId = entry.getLongKey(); - byte cleanerId = cleaner.getByteValue(); - long blockId = entry.getLongValue(); - entriesIterator.remove(); - cleanersIterator.remove(); + int readOnlyInfoSize = referencesInfos.size(); + int modifiedInfoSize = modifiedInfos.size(); + if (readOnlyInfoSize >= FLUSH_CACHE_SIZE || modifiedInfoSize >= MODIFIED_FLUSH_CACHE_SIZE) { + ObjectIterator> entriesIterator = modifiedInfos.long2ObjectEntrySet().iterator(); + long startId = -1; + long prevId = -1; + LinkedList prevInfos = new LinkedList<>(); + for (int i = 0; i < modifiedInfoSize; i++) { + Long2ObjectMap.Entry entry = entriesIterator.next(); + Long refId = entry.getLongKey(); + ReferenceInfo info = entry.getValue(); - tasks.set(i, () -> { - try { - flusher.flush(refId, cleanerId, blockId); - } catch (IOException e) { - throw new CompletionException(e); - } - return null; - }); + if (info == null) { + throw new IOException("A reference is null"); + } + + if (startId == -1) { + startId = refId; + prevInfos.add(info); + } else if (prevId + 1 == refId) { + prevInfos.add(info); + } else { + flushAction(prevInfos, startId); + + startId = refId; + prevInfos.clear(); + prevInfos.add(info); + } + prevId = refId; } - try { - flushExecutorService.invokeAll(tasks); - } catch (InterruptedException e) { - throw new IOException(e.getCause()); + modifiedInfos.clear(); + flushAction(prevInfos, startId); + } + if (readOnlyInfoSize >= FLUSH_CACHE_SIZE) { + ObjectIterator> entriesIterator = referencesInfos.long2ObjectEntrySet().iterator(); + for (int i = 0; i < readOnlyInfoSize - GOOD_CACHE_SIZE; i++) { + entriesIterator.next(); + entriesIterator.remove(); } } } + private void flushAction(LinkedList prevInfos, long startId) throws IOException { + if (prevInfos.size() > 1) { + flusher.flushMultiple(startId, prevInfos.toArray(new ReferenceInfo[0])); + } else if (prevInfos.size() == 1) { + flusher.flush(startId, prevInfos.getFirst(), false); + } + } + public void close() throws IOException { if (!closed) { closed = true; lock.writeLock().lock(); try { - int references2BlocksSize = references2Blocks.size(); - ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); - ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); + int references2BlocksSize = modifiedInfos.size(); + ObjectIterator> entriesIterator = modifiedInfos.long2ObjectEntrySet().iterator(); @SuppressWarnings("unchecked") ObjectArrayList> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize], references2BlocksSize); for (int i = 0; i < references2BlocksSize; i++) { - Long2LongMap.Entry entry = entriesIterator.next(); - Long2ByteMap.Entry cleaner = cleanersIterator.next(); - long refId = entry.getLongKey(); - byte cleanerId = cleaner.getByteValue(); - long blockId = entry.getLongValue(); - entriesIterator.remove(); - cleanersIterator.remove(); + Long2ObjectMap.Entry entry = entriesIterator.next(); + Long refId = entry.getLongKey(); + ReferenceInfo info = entry.getValue(); tasks.set(i, () -> { try { - flusher.flush(refId, cleanerId, blockId); + flusher.flush(refId, info, true); } catch (IOException e) { throw new CompletionException(e); } return null; }); } + modifiedInfos.clear(); try { flushExecutorService.invokeAll(tasks); } catch (InterruptedException e) { diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java index 3b9e2dd..dff5d57 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java @@ -1,8 +1,8 @@ package it.cavallium.strangedb.database.references; import java.io.IOException; -import java.util.concurrent.Future; public interface DatabaseReferencesMetadataCacheFlusher { - void flush(long reference, byte cleanerId, long block) throws IOException; + void flush(long reference, ReferenceInfo info, boolean closing) throws IOException; + void flushMultiple(long referenceStart, ReferenceInfo[] info) throws IOException; } diff --git a/src/main/java/it/cavallium/strangedb/database/references/ReferenceInfo.java b/src/main/java/it/cavallium/strangedb/database/references/ReferenceInfo.java index 5db0e73..6d8c792 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/ReferenceInfo.java +++ b/src/main/java/it/cavallium/strangedb/database/references/ReferenceInfo.java @@ -4,41 +4,49 @@ import java.util.Objects; import java.util.StringJoiner; public class ReferenceInfo { + private final long index; + private final int size; private final byte cleanerId; - private final long blockId; - public ReferenceInfo(byte cleanerId, long blockId) { + public ReferenceInfo(long index, int size, byte cleanerId) { + this.index = index; + this.size = size; this.cleanerId = cleanerId; - this.blockId = blockId; + } + + public long getIndex() { + return index; + } + + public int getSize() { + return size; } public byte getCleanerId() { return cleanerId; } - public long getBlockId() { - return blockId; - } - - @Override - public String toString() { - return new StringJoiner(", ", ReferenceInfo.class.getSimpleName() + "[", "]") - .add("cleanerId=" + cleanerId) - .add("blockId=" + blockId) - .toString(); - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ReferenceInfo that = (ReferenceInfo) o; - return cleanerId == that.cleanerId && - blockId == that.blockId; + return index == that.index && + size == that.size && + cleanerId == that.cleanerId; } @Override public int hashCode() { - return Objects.hash(cleanerId, blockId); + return Objects.hash(index); + } + + @Override + public String toString() { + return new StringJoiner(", ", ReferenceInfo.class.getSimpleName() + "[", "]") + .add("index=" + index) + .add("size=" + size) + .add("cleanerId=" + cleanerId) + .toString(); } }