diff --git a/pom.xml b/pom.xml index 5950574..0c0fc28 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ it.cavallium strangedb-core - 1.5.5 + 1.5.7 strangedb-core https://git.ignuranza.net/andreacavalli/strangedb-core diff --git a/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java b/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java index 28b4728..ca27345 100644 --- a/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java +++ b/src/main/java/it/cavallium/strangedb/database/DatabaseCore.java @@ -4,6 +4,7 @@ import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO; import it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata; import it.cavallium.strangedb.database.references.DatabaseReferencesIO; import it.cavallium.strangedb.database.references.DatabaseReferencesMetadata; +import it.cavallium.strangedb.database.references.ReferenceInfo; import java.io.IOException; import java.nio.ByteBuffer; @@ -11,16 +12,18 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO; + public class DatabaseCore implements IDatabase { private final DatabaseFileIO fileIO; private final DatabaseBlocksIO blocksIO; - private final DatabaseBlocksMetadata blocksMetadata; + protected final DatabaseBlocksMetadata blocksMetadata; protected final DatabaseReferencesIO referencesIO; protected final DatabaseReferencesMetadata referencesMetadata; - private final Path dataFile; - private final Path blocksMetaFile; - private final Path referencesMetaFile; + protected final Path dataFile; + protected final Path blocksMetaFile; + protected final Path referencesMetaFile; protected volatile boolean closed; public DatabaseCore(Path dataFile, Path blocksMetaFile, Path referencesMetaFile) throws IOException { @@ -82,17 +85,24 @@ public class DatabaseCore implements IDatabase { long referencesCount = databaseToClean.referencesMetadata.getFirstFreeReference(); long blocksCount = databaseToClean.blocksMetadata.getTotalBlocksCount(); long writtenReferences = 0; + long writtenBlocks = 0; for (int referenceID = 0; referenceID < referencesCount; referenceID++) { try { - ByteBuffer buffer = databaseToClean.referencesIO.readFromReference(referenceID); - newDatabase.referencesIO.writeToReference(referenceID, buffer.limit(), buffer); - writtenReferences++; + ReferenceInfo ref = databaseToClean.referencesMetadata.getCleanReference(referenceID); + if (!NONEXISTENT_REFERENCE_INFO.equals(ref)) { + ByteBuffer buffer = databaseToClean.referencesIO.readFromReference(referenceID); + newDatabase.referencesIO.writeToReference(referenceID, ref.getCleanerId(), buffer.limit(), buffer); + writtenReferences++; + if (buffer.limit() > 0) { + writtenBlocks++; + } + } } catch (IOException ex) { System.out.println("Error while reading reference " + referenceID + ". References written: " + writtenReferences); } } - System.out.println("References written: " + writtenReferences + ". Removed " + (blocksCount - writtenReferences) + " blocks. Removed " + (referencesCount - writtenReferences) + " references."); + System.out.println("[Core Cleaner] References written: " + writtenReferences + ". Removed " + (blocksCount - writtenBlocks) + " blocks. Removed " + (referencesCount - writtenReferences) + " references."); databaseToClean.close(); newDatabase.close(); Files.deleteIfExists(newDataFile); diff --git a/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java b/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java index 557dc28..0870a05 100644 --- a/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java +++ b/src/main/java/it/cavallium/strangedb/database/DatabaseFileIO.java @@ -3,59 +3,70 @@ package it.cavallium.strangedb.database; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.Files; +import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; public class DatabaseFileIO implements IFileIO { - private final SeekableByteChannel dataFileChannel; - private final Object dataAccessLock = new Object(); - private long firstFreeIndex; + private final AsynchronousFileChannel dataFileChannel; + private final AtomicLong firstFreeIndex; + private volatile boolean closed = false; public DatabaseFileIO(Path dataFile) throws IOException { - synchronized (dataAccessLock) { - dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); - firstFreeIndex = dataFileChannel.size(); - } + dataFileChannel = AsynchronousFileChannel.open(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); + firstFreeIndex = new AtomicLong(dataFileChannel.size()); } @Override public ByteBuffer readAt(long index, int length) throws IOException { + if (closed) throw new IOException("Database closed!"); ByteBuffer dataBuffer = ByteBuffer.allocate(length); - dataFileChannel.position(index).read(dataBuffer); + try { + dataFileChannel.read(dataBuffer, index).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } dataBuffer.flip(); return dataBuffer; } @Override - public void writeAt(long index, int length, ByteBuffer data) throws IOException { - synchronized (dataAccessLock) { - if (data.position() != 0) { - throw new IOException("You didn't flip the ByteBuffer!"); - } - if (firstFreeIndex < index + length) { - firstFreeIndex = index + length; - } - dataFileChannel.position(index).write(data); + public int writeAt(long index, int length, ByteBuffer data) throws IOException { + if (closed) throw new IOException("Database closed!"); + return writeAt_(index, length, data); + } + + private int writeAt_(long index, int length, ByteBuffer data) throws IOException { + if (data.position() != 0) { + throw new IOException("You didn't flip the ByteBuffer!"); + } + firstFreeIndex.updateAndGet((firstFreeIndex) -> firstFreeIndex < index + length ? index + length : firstFreeIndex); + try { + return dataFileChannel.write(data, index).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); } } @Override public long writeAtEnd(int length, ByteBuffer data) throws IOException { - synchronized (dataAccessLock) { - long index = firstFreeIndex; - firstFreeIndex += length; - writeAt(index, length, data); - return index; - } + if (closed) throw new IOException("Database closed!"); + long index = firstFreeIndex.getAndAdd(length); + writeAt_(index, length, data); + return index; } @Override public void close() throws IOException { - synchronized (dataAccessLock) { - dataFileChannel.close(); - } + if (closed) throw new IOException("Database already closed!"); + closed = true; + dataFileChannel.close(); } } diff --git a/src/main/java/it/cavallium/strangedb/database/IDatabase.java b/src/main/java/it/cavallium/strangedb/database/IDatabase.java index cf5c524..2ab0647 100644 --- a/src/main/java/it/cavallium/strangedb/database/IDatabase.java +++ b/src/main/java/it/cavallium/strangedb/database/IDatabase.java @@ -4,6 +4,8 @@ import java.io.IOException; public interface IDatabase { + int DISK_BLOCK_SIZE = 4096; + void close() throws IOException; boolean isClosed(); diff --git a/src/main/java/it/cavallium/strangedb/database/IFileIO.java b/src/main/java/it/cavallium/strangedb/database/IFileIO.java index 2399984..b2fda11 100644 --- a/src/main/java/it/cavallium/strangedb/database/IFileIO.java +++ b/src/main/java/it/cavallium/strangedb/database/IFileIO.java @@ -18,7 +18,7 @@ public interface IFileIO { * @param length length * @param data bytes */ - void writeAt(long index, int length, ByteBuffer data) throws IOException; + int writeAt(long index, int length, ByteBuffer data) throws IOException; /** * Write *length* bytes in position *index* diff --git a/src/main/java/it/cavallium/strangedb/database/IReferencesIO.java b/src/main/java/it/cavallium/strangedb/database/IReferencesIO.java index 4ef994e..c7693a9 100644 --- a/src/main/java/it/cavallium/strangedb/database/IReferencesIO.java +++ b/src/main/java/it/cavallium/strangedb/database/IReferencesIO.java @@ -21,10 +21,11 @@ public interface IReferencesIO { /** * Write some data to the reference * @param reference reference + * @param cleanerId cleaner id * @param size data size * @param data bytes */ - void writeToReference(long reference, int size, ByteBuffer data) throws IOException; + void writeToReference(long reference, byte cleanerId, int size, ByteBuffer data) throws IOException; /** * Read data from the reference diff --git a/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java b/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java index 665104a..0229718 100644 --- a/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java +++ b/src/main/java/it/cavallium/strangedb/database/IReferencesMetadata.java @@ -1,5 +1,7 @@ package it.cavallium.strangedb.database; +import it.cavallium.strangedb.database.references.ReferenceInfo; + import java.io.IOException; public interface IReferencesMetadata { @@ -8,21 +10,36 @@ public interface IReferencesMetadata { * @param reference reference * @return block id */ - long getReference(long reference) throws IOException; + long getReferenceBlockId(long reference) throws IOException; + + /** + * Get reference info + * @param reference reference + * @return reference info + */ + ReferenceInfo getCleanReference(long reference) throws IOException; /** * Allocate a block for a new reference + * * @param blockId block id * @return reference */ long newReference(long blockId) throws IOException; /** - * Change reference size + * Change reference * @param reference reference + * @param cleanerId cleaner id * @param blockId block id */ - void editReference(long reference, long blockId) throws IOException; + void editReference(long reference, byte cleanerId, long blockId) throws IOException; + + /** + * Delete reference + * @param reference reference + */ + void deleteReference(long reference) throws IOException; /** * Close file diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java index a92dd79..f20537f 100644 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java +++ b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksIO.java @@ -6,6 +6,11 @@ import it.cavallium.strangedb.database.IBlocksMetadata; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java index c1e4047..636ec90 100644 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java +++ b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadata.java @@ -4,6 +4,7 @@ package it.cavallium.strangedb.database.blocks; import it.cavallium.strangedb.database.IBlocksMetadata; import java.io.IOException; +import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; @@ -11,11 +12,14 @@ import java.nio.file.StandardOpenOption; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE; + public class DatabaseBlocksMetadata implements IBlocksMetadata { public static final BlockInfo ERROR_BLOCK_INFO = new BlockInfo(-2, 0); + private static final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES; + public static final int BLOCK_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % BLOCK_META_BYTES_COUNT) / BLOCK_META_BYTES_COUNT; private final AsynchronousFileChannel metaFileChannel; - private final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES; private final DatabaseBlocksMetadataCache cache; private long firstFreeBlock; @@ -34,19 +38,48 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) { return blockInfo; } - ByteBuffer buffer = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); + long position = blockId * BLOCK_META_BYTES_COUNT; + int size = BLOCK_META_READS_AT_EVERY_READ * BLOCK_META_BYTES_COUNT; + if (blockId + (size - 1) / BLOCK_META_BYTES_COUNT >= firstFreeBlock) { + size = (int) ((firstFreeBlock - blockId) * BLOCK_META_BYTES_COUNT); + } + int blocksCount = size / BLOCK_META_BYTES_COUNT; + + ByteBuffer buffer = ByteBuffer.allocate(size); try { - metaFileChannel.read(buffer, blockId * BLOCK_META_BYTES_COUNT).get(); + metaFileChannel.read(buffer, position).get(); } catch (InterruptedException e) { throw new IOException(e); } catch (ExecutionException e) { throw new IOException(e.getCause()); } buffer.flip(); - long index = buffer.getLong(); - int size = buffer.getInt(); - blockInfo = new BlockInfo(index, size); - cache.put(blockId, blockInfo); + + if (blocksCount < 1) { + throw new IOException("Trying to read <1 blocks"); + } + if (buffer.limit() % BLOCK_META_BYTES_COUNT != 0 || buffer.limit() < BLOCK_META_BYTES_COUNT) { + throw new IOException("The buffer is smaller than the data requested."); + } else if (buffer.limit() != size) { + size = buffer.limit(); + blocksCount = size / BLOCK_META_BYTES_COUNT; + } + + long[] allBlockIds = new long[blocksCount]; + BlockInfo[] allBlockInfo = new BlockInfo[blocksCount]; + + for (int delta = 0; delta < blocksCount; delta++) { + long blockToLoad = blockId + delta; + long blockIndex = buffer.getLong(); + int blockSize = buffer.getInt(); + BlockInfo currentBlockInfo = new BlockInfo(blockIndex, blockSize); + allBlockIds[delta] = blockToLoad; + allBlockInfo[delta] = currentBlockInfo; + if (blockToLoad == blockId) { + blockInfo = currentBlockInfo; + } + } + cache.putAll(allBlockIds, allBlockInfo); return blockInfo; } diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java index 7716b28..2dc4aa3 100644 --- a/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java +++ b/src/main/java/it/cavallium/strangedb/database/blocks/DatabaseBlocksMetadataCache.java @@ -1,25 +1,31 @@ package it.cavallium.strangedb.database.blocks; -import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap; -import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ; public class DatabaseBlocksMetadataCache { - private static final int GOOD_CACHE_SIZE = 70000; - private static final int MAX_CACHE_SIZE = 100000; + private static final int BASE_QUANTITY = (BLOCK_META_READS_AT_EVERY_READ < 500 ? BLOCK_META_READS_AT_EVERY_READ : 500); + private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY; + private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; + private static final int MAX_CACHE_SIZE = 400 * BLOCK_META_READS_AT_EVERY_READ; - private final Long2ObjectMap blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE); - private final Object readAccessLock = new Object(); - private final Object writeAccessLock = new Object(); + private final Long2ObjectMap blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final DatabaseBlocksMetadataCacheFlusher flusher; private volatile boolean closed; + private ExecutorService flushService = Executors.newSingleThreadExecutor(); + private volatile boolean flushing; public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) { this.flusher = flusher; @@ -27,75 +33,132 @@ public class DatabaseBlocksMetadataCache { public BlockInfo get(long block) throws IOException { if (closed) throw new IOException("Cache already closed!"); - synchronized (readAccessLock) { + lock.readLock().lock(); + try { return blocks2Info.getOrDefault(block, DatabaseBlocksMetadata.ERROR_BLOCK_INFO); + } finally { + lock.readLock().unlock(); } } - public void put(long block, BlockInfo blockInfo) throws IOException { + public void put(long block, BlockInfo blockInfo) { if (closed) return; - synchronized (readAccessLock) { - synchronized (writeAccessLock) { - blocks2Info.put(block, blockInfo); - } + lock.writeLock().lock(); + try { + blocks2Info.put(block, blockInfo); + } finally { + lock.writeLock().unlock(); + flushAsync(); + } + } + + @SuppressWarnings("unchecked") + public void putAll(long[] blocks, BlockInfo[] blockInfos) { + if (closed) return; + lock.writeLock().lock(); + try { + Long2ObjectMap blocksInfosToAdd = new Long2ObjectLinkedOpenHashMap<>(blocks, blockInfos, 0.5f); + blocks2Info.putAll(blocksInfosToAdd); + } finally { + lock.writeLock().unlock(); + flushAsync(); + } + } + + private void flushAsync() { + if (blocks2Info.size() >= FLUSH_CACHE_SIZE && !flushing) { + flushing = true; + flushService.execute(() -> { + try { + flush(); + } catch (IOException e) { + throw new RejectedExecutionException(e); + } + }); } - this.flush(); } private void flush() throws IOException { - if (closed) return; + if (closed) { + flushing = false; + return; + } int blocks2InfoSize = blocks2Info.size(); - if (blocks2InfoSize > MAX_CACHE_SIZE) { - synchronized (writeAccessLock) { + if (blocks2InfoSize >= FLUSH_CACHE_SIZE) { + lock.writeLock().lock(); + try { ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); - List> entriesToFlush = new LinkedList<>(); - while (blocks2InfoSize > GOOD_CACHE_SIZE) { + + List> entriesToFlush = new ArrayList<>(); + while (blocks2InfoSize >= GOOD_CACHE_SIZE) { Long2ObjectMap.Entry entry = entriesIterator.next(); BlockInfo blockInfo = entry.getValue(); entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { - executeAsyncFlush(entriesToFlush); + awaitFlushWriteEnd(entriesToFlush); } blocks2InfoSize--; } - executeAsyncFlush(entriesToFlush); + awaitFlushWriteEnd(entriesToFlush); + } finally { + flushing = false; + lock.writeLock().unlock(); } + } else { + flushing = false; } } public void close() throws IOException { - synchronized (readAccessLock) { - synchronized (writeAccessLock) { - if (!closed) { - closed = true; - int blocks2InfoSize = blocks2Info.size(); - ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); - List> entriesToFlush = new LinkedList<>(); - while (blocks2InfoSize > 0) { - Long2ObjectMap.Entry entry = entriesIterator.next(); - BlockInfo blockInfo = entry.getValue(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); - entriesIterator.remove(); - if (entriesToFlush.size() >= 1000) { - executeAsyncFlush(entriesToFlush); - } - blocks2InfoSize--; + if (!closed) { + closed = true; + stopFlushService(); + lock.writeLock().lock(); + try { + int blocks2InfoSize = blocks2Info.size(); + ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); + + List> entriesToFlush = new LinkedList<>(); + while (blocks2InfoSize > 0) { + Long2ObjectMap.Entry entry = entriesIterator.next(); + BlockInfo blockInfo = entry.getValue(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); + entriesIterator.remove(); + + if (entriesToFlush.size() >= 1000) { + awaitFlushWriteEnd(entriesToFlush); } - executeAsyncFlush(entriesToFlush); + blocks2InfoSize--; } + awaitFlushWriteEnd(entriesToFlush); + } finally { + lock.writeLock().unlock(); } } } - private void executeAsyncFlush(List> entriesToFlush) throws IOException { + private void stopFlushService() { + flushService.shutdown(); + try { flushService.awaitTermination(180, TimeUnit.SECONDS); } catch (InterruptedException e) {} + if (!flushService.isTerminated()) { + flushService.shutdownNow(); + } + } + + private void awaitFlushWriteEnd(List> entriesToFlush) throws IOException { try { - for (Future entryToFlush : entriesToFlush) { - entryToFlush.get(); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { + entriesToFlush.parallelStream().forEach((entry) -> { + try { + entry.get(); + } catch (InterruptedException e) { + throw new CompletionException(e); + } catch (ExecutionException e) { + throw new CompletionException(e.getCause()); + } + }); + } catch (CompletionException e) { throw new IOException(e.getCause()); } finally { entriesToFlush.clear(); diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java index f342228..d54579b 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesIO.java @@ -5,6 +5,8 @@ import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; @@ -12,6 +14,7 @@ public class DatabaseReferencesIO implements IReferencesIO { private final DatabaseBlocksIO blocksIO; private final DatabaseReferencesMetadata referencesMetadata; + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); public DatabaseReferencesIO(DatabaseBlocksIO blocksIO, DatabaseReferencesMetadata referencesMetadata) { this.blocksIO = blocksIO; @@ -30,14 +33,24 @@ public class DatabaseReferencesIO implements IReferencesIO { } @Override - public void writeToReference(long reference, int size, ByteBuffer data) throws IOException { - long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data); - referencesMetadata.editReference(reference, blockId); + public void writeToReference(long reference, byte cleanerId, int size, ByteBuffer data) throws IOException { + lock.writeLock().lock(); + try { + long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data); + referencesMetadata.editReference(reference, cleanerId, blockId); + } finally { + lock.writeLock().unlock(); + } } @Override public ByteBuffer readFromReference(long reference) throws IOException { - long blockId = referencesMetadata.getReference(reference); - return blocksIO.readBlock(blockId); + lock.readLock().lock(); + try { + long blockId = referencesMetadata.getReferenceBlockId(reference); + return blocksIO.readBlock(blockId); + } finally { + lock.readLock().unlock(); + } } } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java index aee0fa1..b8b17be 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata.java @@ -7,79 +7,203 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; +import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE; public class DatabaseReferencesMetadata implements IReferencesMetadata { + public static final byte ERRORED_CLEANER = (byte) -1; + public static final byte BLANK_DATA_CLEANER = (byte) -2; + public static final ReferenceInfo NONEXISTENT_REFERENCE_INFO = new ReferenceInfo(ERRORED_CLEANER, ERROR_BLOCK_ID); + private static final int REF_META_BYTES_COUNT = Long.BYTES + Integer.BYTES; + public static final int REF_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % REF_META_BYTES_COUNT) / REF_META_BYTES_COUNT; + private final AsynchronousFileChannel metaFileChannel; - private final int REF_META_BYTES_COUNT = Long.BYTES; private final DatabaseReferencesMetadataCache cache; private long firstFreeReference; + private long lastWrittenReference; + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); public DatabaseReferencesMetadata(Path refMetaFile) throws IOException { metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT; + lastWrittenReference = firstFreeReference - 1; this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk); } @Override - public long getReference(long reference) throws IOException { + public long getReferenceBlockId(long reference) throws IOException { + lock.readLock().lock(); + try { + return getReferenceBlockId_(reference); + } finally { + lock.readLock().unlock(); + } + } + + private long getReferenceBlockId_(long reference) throws IOException { if (reference >= firstFreeReference) { return EMPTY_BLOCK_ID; } long block; - if ((block = cache.get(reference)) != ERROR_BLOCK_ID) { + if ((block = cache.getBlock(reference)) != ERROR_BLOCK_ID) { return block; } - ByteBuffer buffer = ByteBuffer.allocate(REF_META_BYTES_COUNT); + long position = reference * REF_META_BYTES_COUNT; + int size = REF_META_READS_AT_EVERY_READ * REF_META_BYTES_COUNT; + if (reference + (size - 1) / REF_META_BYTES_COUNT >= firstFreeReference) { + size = (int) ((firstFreeReference - reference) * REF_META_BYTES_COUNT); + } + int referencesCount = size / REF_META_BYTES_COUNT; + + ByteBuffer buffer = ByteBuffer.allocate(size); try { - metaFileChannel.read(buffer, reference * REF_META_BYTES_COUNT).get(); + metaFileChannel.read(buffer, position).get(); } catch (InterruptedException e) { throw new IOException(e); } catch (ExecutionException e) { throw new IOException(e.getCause()); } buffer.flip(); - block = buffer.getLong(); - if (buffer.limit() != 0 && block != 0xFFFFFFFFFFFFFFFFL) { - cache.put(reference, block); - return block; - } else { - cache.put(reference, EMPTY_BLOCK_ID); - return EMPTY_BLOCK_ID; + + if (referencesCount < 1) { + throw new IOException("Trying to read <1 references"); + } + if (buffer.limit() % REF_META_BYTES_COUNT != 0 || buffer.limit() < REF_META_BYTES_COUNT) { + throw new IOException("The buffer is smaller than the data requested."); + } else if (buffer.limit() != size) { + size = buffer.limit(); + referencesCount = size / REF_META_BYTES_COUNT; + } + + long[] allReferences = new long[referencesCount]; + byte[] allCleaners = new byte[referencesCount]; + long[] allBlocks = new long[referencesCount]; + + block = EMPTY_BLOCK_ID; + for (int delta = 0; delta < referencesCount; delta++) { + long referenceToLoad = reference + delta; + long currentBlock = buffer.getLong(); + byte cleanerId = (byte) buffer.getInt(); + if (buffer.limit() != 0 && currentBlock != 0xFFFFFFFFFFFFFFFFL) { + allReferences[delta] = referenceToLoad; + allCleaners[delta] = cleanerId; + allBlocks[delta] = currentBlock; + if (referenceToLoad == reference) { + block = currentBlock; + } + } else { + allReferences[delta] = referenceToLoad; + allCleaners[delta] = cleanerId; + allBlocks[delta] = EMPTY_BLOCK_ID; + if (referenceToLoad == reference) { + block = EMPTY_BLOCK_ID; + } + } + } + + for (int delta = 0; delta < referencesCount; delta++) { + if (allCleaners[delta] == 0) { + System.out.println("ro"); + } + } + cache.putAll(allReferences, allCleaners, allBlocks); + return block; + } + + /** + * This method is SLOW! Use this only for the cleaner + * @param reference reference + * @return + * @throws IOException + */ + @Deprecated + @Override + public ReferenceInfo getCleanReference(long reference) throws IOException { + lock.readLock().lock(); + try { + getReferenceBlockId_(reference); + return cache.get(reference); + } finally { + lock.readLock().unlock(); } } @Override public long newReference(long blockId) throws IOException { - long newReference = firstFreeReference++; - cache.put(newReference, blockId); - return newReference; + lock.writeLock().lock(); + try { + long newReference = firstFreeReference++; + cache.put(newReference, BLANK_DATA_CLEANER, blockId); + return newReference; + } finally { + lock.writeLock().unlock(); + } } @Override - public void editReference(long reference, long blockId) throws IOException { - cache.put(reference, blockId); + public void editReference(long reference, byte cleanerId, long blockId) throws IOException { + lock.writeLock().lock(); + try { + cache.put(reference, cleanerId, blockId); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void deleteReference(long reference) throws IOException { + lock.writeLock().lock(); + try { + cache.put(reference, NONEXISTENT_REFERENCE_INFO.getCleanerId(), NONEXISTENT_REFERENCE_INFO.getBlockId()); + } finally { + lock.writeLock().unlock(); + } } @Override public void close() throws IOException { - cache.close(); - metaFileChannel.close(); + lock.writeLock().lock(); + try { + cache.close(); + metaFileChannel.close(); + } finally { + lock.writeLock().unlock(); + } } @Override public long getFirstFreeReference() { - return firstFreeReference; + synchronized (lock.readLock()) { + return firstFreeReference; + } } - private Future writeReferenceToDisk(long reference, long blockId) { + private Future writeReferenceToDisk(long reference, byte cleanerId, long blockId) { + if (cleanerId == ERRORED_CLEANER) { + return CompletableFuture.failedFuture(new IOException("Passing a cleaner with the id of ERRORED_CLIENT")); + } ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); data.putLong(blockId); + data.putInt(cleanerId & 0xFF); data.flip(); + while (lastWrittenReference < reference - 1) { + ByteBuffer emptyData = ByteBuffer.allocate(REF_META_BYTES_COUNT); + emptyData.putLong(ERROR_BLOCK_ID); + emptyData.putInt(ERRORED_CLEANER & 0xFF); + emptyData.flip(); + metaFileChannel.write(emptyData, ++lastWrittenReference * REF_META_BYTES_COUNT); + } + if (reference > lastWrittenReference) { + lastWrittenReference = reference; + } return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT); } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java index e31cdae..19b2f3f 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCache.java @@ -4,108 +4,192 @@ import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; +import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; +import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ; +import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.*; public class DatabaseReferencesMetadataCache { - private static final int GOOD_CACHE_SIZE = 70000; - private static final int MAX_CACHE_SIZE = 100000; + private static final int BASE_QUANTITY = (REF_META_READS_AT_EVERY_READ < 500 ? REF_META_READS_AT_EVERY_READ : 500); + private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY; + private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; + private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY; - private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE); - private final Object readAccessLock = new Object(); - private final Object writeAccessLock = new Object(); + private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f); + private final Long2ByteMap referencesCleaners = new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final DatabaseReferencesMetadataCacheFlusher flusher; private volatile boolean closed; + private ExecutorService flushService = Executors.newSingleThreadExecutor(); + private volatile boolean flushing; public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { this.flusher = flusher; } - public long get(long reference) throws IOException { - synchronized (readAccessLock) { - if (closed) throw new IOException("Cache already closed!"); + public long getBlock(long reference) throws IOException { + if (closed) throw new IOException("Cache already closed!"); + lock.readLock().lock(); + try { return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); + } finally { + lock.readLock().unlock(); } } - public void put(long reference, long blockId) throws IOException { - synchronized (readAccessLock) { - synchronized (writeAccessLock) { - if (closed) return; - references2Blocks.put(reference, blockId); + public ReferenceInfo get(long reference) throws IOException { + if (closed) throw new IOException("Cache already closed!"); + lock.readLock().lock(); + try { + long blockId = references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); + if (blockId == ERROR_BLOCK_ID) { + return NONEXISTENT_REFERENCE_INFO; } + byte cleanerId = referencesCleaners.get(reference); + return new ReferenceInfo(cleanerId, blockId); + } finally { + lock.readLock().unlock(); + } + } + + public void put(long reference, byte cleanerId, long blockId) throws IOException { + if (closed) throw new IOException("Cache already closed!"); + lock.writeLock().lock(); + try { + if (cleanerId == 0) { + throw new IOException("Null cleaner id"); + } + references2Blocks.put(reference, blockId); + referencesCleaners.put(reference, cleanerId); + } finally { + lock.writeLock().unlock(); + flushAsync(); + } + } + + public void putAll(long[] references, byte[] cleanerIds, long[] blockIds) throws IOException { + if (closed) throw new IOException("Cache already closed!"); + lock.writeLock().lock(); + try { + for (int i = 0; i < references.length; i++) { + if (cleanerIds[i] == 0) { + throw new IOException("Null cleaner id"); + } + } + Long2LongMap referencesBlocksToAdd = new Long2LongLinkedOpenHashMap(references, blockIds, 0.5f); + Long2ByteMap referencesCleanersToAdd = new Long2ByteLinkedOpenHashMap(references, cleanerIds, 0.5f); + references2Blocks.putAll(referencesBlocksToAdd); + referencesCleaners.putAll(referencesCleanersToAdd); + } finally { + lock.writeLock().unlock(); + flushAsync(); + } + } + + private void flushAsync() { + if (references2Blocks.size() >= FLUSH_CACHE_SIZE && !flushing) { + flushing = true; + flushService.execute(() -> { + try { + flush(); + } catch (IOException e) { + throw new RejectedExecutionException(e); + } + }); } - this.flush(); } private void flush() throws IOException { - synchronized (readAccessLock) { - synchronized (writeAccessLock) { - if (closed) return; - int references2BlocksSize = references2Blocks.size(); - if (references2BlocksSize > MAX_CACHE_SIZE) { - synchronized (writeAccessLock) { - ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); - List> entriesToFlush = new LinkedList<>(); - while (references2BlocksSize > GOOD_CACHE_SIZE) { - Long2LongMap.Entry entry = entriesIterator.next(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); - entriesIterator.remove(); - if (entriesToFlush.size() >= 1000) { - executeAsyncFlush(entriesToFlush); - } - references2BlocksSize--; - } - executeAsyncFlush(entriesToFlush); + if (closed) { + flushing = false; + return; + } + int references2BlocksSize = references2Blocks.size(); + if (references2BlocksSize >= FLUSH_CACHE_SIZE) { + lock.writeLock().lock(); + try { + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); + List> entriesToFlush = new ArrayList<>(); + while (references2BlocksSize >= GOOD_CACHE_SIZE) { + Long2LongMap.Entry entry = entriesIterator.next(); + Long2ByteMap.Entry cleaner = cleanersIterator.next(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue())); + entriesIterator.remove(); + cleanersIterator.remove(); + if (entriesToFlush.size() >= 1000) { + awaitFlushWriteEnd(entriesToFlush); } + references2BlocksSize--; } + awaitFlushWriteEnd(entriesToFlush); + } finally { + flushing = false; + lock.writeLock().unlock(); } + } else { + flushing = false; } } public void close() throws IOException { - synchronized (readAccessLock) { - synchronized (writeAccessLock) { - if (!closed) { - closed = true; - int references2BlocksSize = references2Blocks.size(); - ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); - List> entriesToFlush = new LinkedList<>(); - while (references2BlocksSize > 0) { - Long2LongMap.Entry entry = entriesIterator.next(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); - entriesIterator.remove(); - if (entriesToFlush.size() >= 1000) { - executeAsyncFlush(entriesToFlush); - } - references2BlocksSize--; + if (!closed) { + closed = true; + stopFlushService(); + lock.writeLock().lock(); + try { + int references2BlocksSize = references2Blocks.size(); + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + ObjectIterator cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (references2BlocksSize > 0) { + Long2LongMap.Entry entry = entriesIterator.next(); + Long2ByteMap.Entry cleaner = cleanersIterator.next(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue())); + entriesIterator.remove(); + cleanersIterator.remove(); + + if (entriesToFlush.size() >= 1000) { + awaitFlushWriteEnd(entriesToFlush); } - executeAsyncFlush(entriesToFlush); + references2BlocksSize--; } + awaitFlushWriteEnd(entriesToFlush); + } finally { + lock.writeLock().unlock(); } } } - private void executeAsyncFlush(List> entriesToFlush) throws IOException { - synchronized (readAccessLock) { - synchronized (writeAccessLock) { + private void stopFlushService() { + flushService.shutdown(); + try { flushService.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) {} + if (!flushService.isTerminated()) { + flushService.shutdownNow(); + } + } + + private void awaitFlushWriteEnd(List> entriesToFlush) throws IOException { + try { + entriesToFlush.parallelStream().forEach((entry) -> { try { - for (Future entryToFlush : entriesToFlush) { - entryToFlush.get(); - } + entry.get(); } catch (InterruptedException e) { - throw new IOException(e); + throw new CompletionException(e); } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } finally { - entriesToFlush.clear(); + throw new CompletionException(e.getCause()); } - } + }); + } catch (CompletionException e) { + throw new IOException(e.getCause()); + } finally { + entriesToFlush.clear(); } } } diff --git a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java index 7c5f231..05d016a 100644 --- a/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java +++ b/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadataCacheFlusher.java @@ -4,5 +4,5 @@ import java.io.IOException; import java.util.concurrent.Future; public interface DatabaseReferencesMetadataCacheFlusher { - Future flush(long key, long value) throws IOException; + Future flush(long reference, byte cleanerId, long block) throws IOException; } diff --git a/src/main/java/it/cavallium/strangedb/database/references/ReferenceInfo.java b/src/main/java/it/cavallium/strangedb/database/references/ReferenceInfo.java new file mode 100644 index 0000000..5db0e73 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/references/ReferenceInfo.java @@ -0,0 +1,44 @@ +package it.cavallium.strangedb.database.references; + +import java.util.Objects; +import java.util.StringJoiner; + +public class ReferenceInfo { + private final byte cleanerId; + private final long blockId; + + public ReferenceInfo(byte cleanerId, long blockId) { + this.cleanerId = cleanerId; + this.blockId = blockId; + } + + public byte getCleanerId() { + return cleanerId; + } + + public long getBlockId() { + return blockId; + } + + @Override + public String toString() { + return new StringJoiner(", ", ReferenceInfo.class.getSimpleName() + "[", "]") + .add("cleanerId=" + cleanerId) + .add("blockId=" + blockId) + .toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReferenceInfo that = (ReferenceInfo) o; + return cleanerId == that.cleanerId && + blockId == that.blockId; + } + + @Override + public int hashCode() { + return Objects.hash(cleanerId, blockId); + } +}