Here is a sample; alter the names:

Yoyodyne, Inc., hereby disclaims all copyright interest in the program `Gnomovision' (which makes passes at compilers) written by James Hacker.

signature of Ty Coon, 1 April 1989 Ty Coon, President of Vice diff --git a/ b/ new file mode 100644 index 0000000..61ae648 --- /dev/null +++ b/ @@ -0,0 +1,3 @@ +# StrangeDb Core + +Strange database core. diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..5950574 --- /dev/null +++ b/pom.xml @@ -0,0 +1,82 @@ + + + + 4.0.0 + + it.cavallium + strangedb-core + 1.5.5 + + strangedb-core + + + + UTF-8 + 10 + 10 + + + + + sonatype-snapshots + sonatype snapshots repo + + + + + + + junit + junit + 4.11 + test + + + it.unimi.dsi + fastutil + 8.2.2 + + + org.apache.commons + commons-lang3 + 3.5 + + + + + + + + maven-clean-plugin + 3.0.0 + + + + maven-resources-plugin + 3.0.2 + + + maven-compiler-plugin + 3.7.0 + + + maven-surefire-plugin + 2.22.1 + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + 2.8.2 + + + + + diff --git a/src/main/java/it/cavallium/strangedb/ b/src/main/java/it/cavallium/strangedb/ new file mode 100644 index 0000000..ad6c9f5 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/ @@ -0,0 +1,10 @@ +package it.cavallium.strangedb; + +public class VariableWrapper { + + public T var; + + public VariableWrapper(T value) { + this.var = value; + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..28b4728 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,102 @@ +package it.cavallium.strangedb.database; + +import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO; +import it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata; +import it.cavallium.strangedb.database.references.DatabaseReferencesIO; +import it.cavallium.strangedb.database.references.DatabaseReferencesMetadata; + +import; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +public class DatabaseCore implements IDatabase { + + private final DatabaseFileIO fileIO; + private final DatabaseBlocksIO blocksIO; + private final DatabaseBlocksMetadata blocksMetadata; + protected final DatabaseReferencesIO referencesIO; + protected final DatabaseReferencesMetadata referencesMetadata; + private final Path dataFile; + private final Path blocksMetaFile; + private final Path referencesMetaFile; + protected volatile boolean closed; + + public DatabaseCore(Path dataFile, Path blocksMetaFile, Path referencesMetaFile) throws IOException { + if (Files.notExists(dataFile)) { + Files.createFile(dataFile); + } + if (Files.notExists(blocksMetaFile)) { + Files.createFile(blocksMetaFile); + } + if (Files.notExists(referencesMetaFile)) { + Files.createFile(referencesMetaFile); + } + this.dataFile = dataFile; + this.blocksMetaFile = blocksMetaFile; + this.referencesMetaFile = referencesMetaFile; + this.fileIO = new DatabaseFileIO(dataFile); + this.blocksMetadata = new DatabaseBlocksMetadata(blocksMetaFile); + this.blocksIO = new DatabaseBlocksIO(fileIO, blocksMetadata); + this.referencesMetadata = new DatabaseReferencesMetadata(referencesMetaFile); + this.referencesIO = new DatabaseReferencesIO(blocksIO, referencesMetadata); + } + + @Override + public synchronized void close() throws IOException { + if (this.closed) { + throw new IOException("The database has been already closed!"); + } + this.referencesMetadata.close(); + this.blocksMetadata.close(); + this.fileIO.close(); + this.closed = true; + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public void closeAndClean() throws IOException { + if (!this.closed) { + this.close(); + } + Path newDataFile = dataFile.resolveSibling("compressed-data-file.tmp"); + Path newBlocksFile = blocksMetaFile.resolveSibling("compressed-blocks-file.tmp"); + Path newReferencesFile = referencesMetaFile.resolveSibling("compressed-references-file.tmp"); + Path backupDataFile = dataFile.resolveSibling("backup-data.db.bak"); + Path backupBlocksFile = blocksMetaFile.resolveSibling("backup-blocks.dat.bak"); + Path backupReferencesFile = referencesMetaFile.resolveSibling("backup-references.dat.bak"); + Files.copy(dataFile, backupDataFile, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.COPY_ATTRIBUTES); + Files.copy(blocksMetaFile, backupBlocksFile, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.COPY_ATTRIBUTES); + Files.copy(referencesMetaFile, backupReferencesFile, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.COPY_ATTRIBUTES); + Files.move(dataFile, newDataFile, StandardCopyOption.REPLACE_EXISTING); + Files.move(blocksMetaFile, newBlocksFile, StandardCopyOption.REPLACE_EXISTING); + Files.move(referencesMetaFile, newReferencesFile, StandardCopyOption.REPLACE_EXISTING); + DatabaseCore databaseToClean = new DatabaseCore(newDataFile, newBlocksFile, newReferencesFile); + DatabaseCore newDatabase = new DatabaseCore(dataFile, blocksMetaFile, referencesMetaFile); + + long referencesCount = databaseToClean.referencesMetadata.getFirstFreeReference(); + long blocksCount = databaseToClean.blocksMetadata.getTotalBlocksCount(); + long writtenReferences = 0; + + for (int referenceID = 0; referenceID < referencesCount; referenceID++) { + try { + ByteBuffer buffer = databaseToClean.referencesIO.readFromReference(referenceID); + newDatabase.referencesIO.writeToReference(referenceID, buffer.limit(), buffer); + writtenReferences++; + } catch (IOException ex) { + System.out.println("Error while reading reference " + referenceID + ". References written: " + writtenReferences); + } + } + System.out.println("References written: " + writtenReferences + ". Removed " + (blocksCount - writtenReferences) + " blocks. Removed " + (referencesCount - writtenReferences) + " references."); + databaseToClean.close(); + newDatabase.close(); + Files.deleteIfExists(newDataFile); + Files.deleteIfExists(newBlocksFile); + Files.deleteIfExists(newReferencesFile); + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..557dc28 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,61 @@ +package it.cavallium.strangedb.database; + + +import; +import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class DatabaseFileIO implements IFileIO { + + private final SeekableByteChannel dataFileChannel; + private final Object dataAccessLock = new Object(); + private long firstFreeIndex; + + public DatabaseFileIO(Path dataFile) throws IOException { + synchronized (dataAccessLock) { + dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); + firstFreeIndex = dataFileChannel.size(); + } + } + + @Override + public ByteBuffer readAt(long index, int length) throws IOException { + ByteBuffer dataBuffer = ByteBuffer.allocate(length); + dataFileChannel.position(index).read(dataBuffer); + dataBuffer.flip(); + return dataBuffer; + } + + @Override + public void writeAt(long index, int length, ByteBuffer data) throws IOException { + synchronized (dataAccessLock) { + if (data.position() != 0) { + throw new IOException("You didn't flip the ByteBuffer!"); + } + if (firstFreeIndex < index + length) { + firstFreeIndex = index + length; + } + dataFileChannel.position(index).write(data); + } + } + + @Override + public long writeAtEnd(int length, ByteBuffer data) throws IOException { + synchronized (dataAccessLock) { + long index = firstFreeIndex; + firstFreeIndex += length; + writeAt(index, length, data); + return index; + } + } + + @Override + public void close() throws IOException { + synchronized (dataAccessLock) { + dataFileChannel.close(); + } + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..1e6fc50 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,26 @@ +package it.cavallium.strangedb.database; + +import; +import java.nio.ByteBuffer; + +public interface IBlocksIO { + /** + * Allocate a block + * @param size block size + * @param data block data + * @return the block id + */ + long newBlock(int size, ByteBuffer data) throws IOException; + + /** + * Read a block + * @param blockId block id + * @return block data + */ + ByteBuffer readBlock(long blockId) throws IOException; + + /** + * Close file + */ + void close(); +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..04f4c5d --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,53 @@ +package it.cavallium.strangedb.database; + +import it.cavallium.strangedb.database.blocks.BlockInfo; + +import; + +public interface IBlocksMetadata { + long EMPTY_BLOCK_ID = -1; + long ERROR_BLOCK_ID = -2; + BlockInfo EMPTY_BLOCK_INFO = new BlockInfo(0, 0); + + /** + * Get block info + * @param blockId block id + * @return block metadata + */ + BlockInfo getBlockInfo(long blockId) throws IOException; + + /** + * New empty block info + * @return block id + */ + default long newBlock() { + return EMPTY_BLOCK_ID; + } + + /** + * Set block info + * @param index block index + * @param size block size + * @return block id + */ + long newBlock(long index, int size) throws IOException; + + /** + * Set block info + * @param blockInfo block info + * @return block id + */ + default long newBlock(BlockInfo blockInfo) throws IOException { + return this.newBlock(blockInfo.getIndex(), blockInfo.getSize()); + } + /** + * Close file + */ + void close() throws IOException; + + /** + * Get total count of blocks + * @return + */ + long getTotalBlocksCount(); +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..cf5c524 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,12 @@ +package it.cavallium.strangedb.database; + +import; + +public interface IDatabase { + + void close() throws IOException; + + boolean isClosed(); + + void closeAndClean() throws IOException; +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..2399984 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,35 @@ +package it.cavallium.strangedb.database; + +import*; +import java.nio.ByteBuffer; + +public interface IFileIO { + /** + * Read *length* bytes in position *index* + * @param index index + * @param length length + * @return bytes + */ + ByteBuffer readAt(long index, int length) throws IOException; + + /** + * Write *length* bytes in position *index* + * @param index index + * @param length length + * @param data bytes + */ + void writeAt(long index, int length, ByteBuffer data) throws IOException; + + /** + * Write *length* bytes in position *index* + * @param length length + * @param data bytes + * @return index + */ + long writeAtEnd(int length, ByteBuffer data) throws IOException; + + /** + * Close the file + */ + void close() throws IOException; +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..4ef994e --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,35 @@ +package it.cavallium.strangedb.database; + +import; +import java.nio.ByteBuffer; + +public interface IReferencesIO { + /** + * Allocate a new empty reference + * @return the new reference + */ + long allocateReference() throws IOException; + + /** + * Allocate a new reference with that data + * @param size data size + * @param data bytes + * @return the new reference + */ + long allocateReference(int size, ByteBuffer data) throws IOException; + + /** + * Write some data to the reference + * @param reference reference + * @param size data size + * @param data bytes + */ + void writeToReference(long reference, int size, ByteBuffer data) throws IOException; + + /** + * Read data from the reference + * @param reference reference + * @return bytes + */ + ByteBuffer readFromReference(long reference) throws IOException; +} diff --git a/src/main/java/it/cavallium/strangedb/database/ b/src/main/java/it/cavallium/strangedb/database/ new file mode 100644 index 0000000..665104a --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/ @@ -0,0 +1,33 @@ +package it.cavallium.strangedb.database; + +import; + +public interface IReferencesMetadata { + /** + * Get block of reference + * @param reference reference + * @return block id + */ + long getReference(long reference) throws IOException; + + /** + * Allocate a block for a new reference + * @param blockId block id + * @return reference + */ + long newReference(long blockId) throws IOException; + + /** + * Change reference size + * @param reference reference + * @param blockId block id + */ + void editReference(long reference, long blockId) throws IOException; + + /** + * Close file + */ + void close() throws IOException; + + long getFirstFreeReference(); +} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/ b/src/main/java/it/cavallium/strangedb/database/blocks/ new file mode 100644 index 0000000..bf2c9c0 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/blocks/ @@ -0,0 +1,44 @@ +package it.cavallium.strangedb.database.blocks; + +import java.util.Objects; +import java.util.StringJoiner; + +public class BlockInfo { + private final long index; + private final int size; + + public BlockInfo(long index, int size) { + this.index = index; + this.size = size; + } + + public long getIndex() { + return index; + } + + public int getSize() { + return size; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + BlockInfo blockInfo = (BlockInfo) o; + return index == blockInfo.index && + size == blockInfo.size; + } + + @Override + public int hashCode() { + return Objects.hash(index, size); + } + + @Override + public String toString() { + return new StringJoiner(", ", BlockInfo.class.getSimpleName() + "[", "]") + .add("index=" + index) + .add("size=" + size) + .toString(); + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/ b/src/main/java/it/cavallium/strangedb/database/blocks/ new file mode 100644 index 0000000..a92dd79 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/blocks/ @@ -0,0 +1,44 @@ +package it.cavallium.strangedb.database.blocks; + +import it.cavallium.strangedb.database.DatabaseFileIO; +import it.cavallium.strangedb.database.IBlocksIO; +import it.cavallium.strangedb.database.IBlocksMetadata; + +import; +import java.nio.ByteBuffer; + +import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; + +public class DatabaseBlocksIO implements IBlocksIO { + + private final DatabaseFileIO fileIO; + private final IBlocksMetadata blocksMetadata; + + public DatabaseBlocksIO(DatabaseFileIO fileIO, IBlocksMetadata blocksMetadata) { + this.fileIO = fileIO; + this.blocksMetadata = blocksMetadata; + } + + @Override + public long newBlock(int size, ByteBuffer data) throws IOException { + if (size == 0) { + return EMPTY_BLOCK_ID; + } + long index = fileIO.writeAtEnd(size, data); + return blocksMetadata.newBlock(index, size); + } + + @Override + public ByteBuffer readBlock(long blockId) throws IOException { + if (blockId == EMPTY_BLOCK_ID) { + return ByteBuffer.wrap(new byte[0]); + } + BlockInfo blockInfo = blocksMetadata.getBlockInfo(blockId); + return fileIO.readAt(blockInfo.getIndex(), blockInfo.getSize()); + } + + @Override + public void close() { + + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/ b/src/main/java/it/cavallium/strangedb/database/blocks/ new file mode 100644 index 0000000..c1e4047 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/blocks/ @@ -0,0 +1,82 @@ +package it.cavallium.strangedb.database.blocks; + + +import it.cavallium.strangedb.database.IBlocksMetadata; + +import; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class DatabaseBlocksMetadata implements IBlocksMetadata { + public static final BlockInfo ERROR_BLOCK_INFO = new BlockInfo(-2, 0); + + private final AsynchronousFileChannel metaFileChannel; + private final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES; + private final DatabaseBlocksMetadataCache cache; + private long firstFreeBlock; + + public DatabaseBlocksMetadata(Path metaFile) throws IOException { + metaFileChannel =, StandardOpenOption.READ, StandardOpenOption.WRITE); + firstFreeBlock = metaFileChannel.size() / BLOCK_META_BYTES_COUNT; + this.cache = new DatabaseBlocksMetadataCache(this::writeBlockToDisk); + } + + @Override + public BlockInfo getBlockInfo(long blockId) throws IOException { + if (blockId == EMPTY_BLOCK_ID) { + return EMPTY_BLOCK_INFO; + } + BlockInfo blockInfo; + if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) { + return blockInfo; + } + ByteBuffer buffer = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); + try { +, blockId * BLOCK_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + buffer.flip(); + long index = buffer.getLong(); + int size = buffer.getInt(); + blockInfo = new BlockInfo(index, size); + cache.put(blockId, blockInfo); + return blockInfo; + } + + @Override + public long newBlock(long index, int size) throws IOException { + if (size == 0) { + return EMPTY_BLOCK_ID; + } + long newBlockId = firstFreeBlock++; + BlockInfo blockInfo = new BlockInfo(index, size); + cache.put(newBlockId, blockInfo); + return newBlockId; + } + + @Override + public void close() throws IOException { + cache.close(); + metaFileChannel.close(); + } + + private Future writeBlockToDisk(long blockId, long index, int size) { + ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); + data.putLong(index); + data.putInt(size); + data.flip(); + return metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT); + } + + @Override + public long getTotalBlocksCount() { + return firstFreeBlock; + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/ b/src/main/java/it/cavallium/strangedb/database/blocks/ new file mode 100644 index 0000000..7716b28 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/blocks/ @@ -0,0 +1,104 @@ +package it.cavallium.strangedb.database.blocks; + +import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.objects.ObjectIterator; + +import; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +public class DatabaseBlocksMetadataCache { + + private static final int GOOD_CACHE_SIZE = 70000; + private static final int MAX_CACHE_SIZE = 100000; + + private final Long2ObjectMap blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE); + private final Object readAccessLock = new Object(); + private final Object writeAccessLock = new Object(); + private final DatabaseBlocksMetadataCacheFlusher flusher; + private volatile boolean closed; + + public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) { + this.flusher = flusher; + } + + public BlockInfo get(long block) throws IOException { + if (closed) throw new IOException("Cache already closed!"); + synchronized (readAccessLock) { + return blocks2Info.getOrDefault(block, DatabaseBlocksMetadata.ERROR_BLOCK_INFO); + } + } + + public void put(long block, BlockInfo blockInfo) throws IOException { + if (closed) return; + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + blocks2Info.put(block, blockInfo); + } + } + this.flush(); + } + + private void flush() throws IOException { + if (closed) return; + int blocks2InfoSize = blocks2Info.size(); + if (blocks2InfoSize > MAX_CACHE_SIZE) { + synchronized (writeAccessLock) { + ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (blocks2InfoSize > GOOD_CACHE_SIZE) { + Long2ObjectMap.Entry entry =; + BlockInfo blockInfo = entry.getValue(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); + entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } + blocks2InfoSize--; + } + executeAsyncFlush(entriesToFlush); + } + } + } + + public void close() throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + if (!closed) { + closed = true; + int blocks2InfoSize = blocks2Info.size(); + ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (blocks2InfoSize > 0) { + Long2ObjectMap.Entry entry =; + BlockInfo blockInfo = entry.getValue(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); + entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } + blocks2InfoSize--; + } + executeAsyncFlush(entriesToFlush); + } + } + } + } + + private void executeAsyncFlush(List> entriesToFlush) throws IOException { + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + entriesToFlush.clear(); + } + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/blocks/ b/src/main/java/it/cavallium/strangedb/database/blocks/ new file mode 100644 index 0000000..95865fb --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/blocks/ @@ -0,0 +1,8 @@ +package it.cavallium.strangedb.database.blocks; + +import; +import java.util.concurrent.Future; + +public interface DatabaseBlocksMetadataCacheFlusher { + Future flush(long key, long value1, int value2) throws IOException; +} diff --git a/src/main/java/it/cavallium/strangedb/database/references/ b/src/main/java/it/cavallium/strangedb/database/references/ new file mode 100644 index 0000000..f342228 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/references/ @@ -0,0 +1,43 @@ +package it.cavallium.strangedb.database.references; + +import it.cavallium.strangedb.database.IReferencesIO; +import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO; + +import; +import java.nio.ByteBuffer; + +import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; + +public class DatabaseReferencesIO implements IReferencesIO { + + private final DatabaseBlocksIO blocksIO; + private final DatabaseReferencesMetadata referencesMetadata; + + public DatabaseReferencesIO(DatabaseBlocksIO blocksIO, DatabaseReferencesMetadata referencesMetadata) { + this.blocksIO = blocksIO; + this.referencesMetadata = referencesMetadata; + } + + @Override + public long allocateReference() throws IOException { + return referencesMetadata.newReference(EMPTY_BLOCK_ID); + } + + @Override + public long allocateReference(int size, ByteBuffer data) throws IOException { + long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data); + return referencesMetadata.newReference(blockId); + } + + @Override + public void writeToReference(long reference, int size, ByteBuffer data) throws IOException { + long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data); + referencesMetadata.editReference(reference, blockId); + } + + @Override + public ByteBuffer readFromReference(long reference) throws IOException { + long blockId = referencesMetadata.getReference(reference); + return blocksIO.readBlock(blockId); + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/references/ b/src/main/java/it/cavallium/strangedb/database/references/ new file mode 100644 index 0000000..aee0fa1 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/references/ @@ -0,0 +1,86 @@ +package it.cavallium.strangedb.database.references; + +import it.cavallium.strangedb.database.IReferencesMetadata; + +import; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; +import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; + +public class DatabaseReferencesMetadata implements IReferencesMetadata { + private final AsynchronousFileChannel metaFileChannel; + private final int REF_META_BYTES_COUNT = Long.BYTES; + private final DatabaseReferencesMetadataCache cache; + private long firstFreeReference; + + public DatabaseReferencesMetadata(Path refMetaFile) throws IOException { + metaFileChannel =, StandardOpenOption.READ, StandardOpenOption.WRITE); + firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT; + this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk); + } + + @Override + public long getReference(long reference) throws IOException { + if (reference >= firstFreeReference) { + return EMPTY_BLOCK_ID; + } + long block; + if ((block = cache.get(reference)) != ERROR_BLOCK_ID) { + return block; + } + ByteBuffer buffer = ByteBuffer.allocate(REF_META_BYTES_COUNT); + try { +, reference * REF_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + buffer.flip(); + block = buffer.getLong(); + if (buffer.limit() != 0 && block != 0xFFFFFFFFFFFFFFFFL) { + cache.put(reference, block); + return block; + } else { + cache.put(reference, EMPTY_BLOCK_ID); + return EMPTY_BLOCK_ID; + } + } + + @Override + public long newReference(long blockId) throws IOException { + long newReference = firstFreeReference++; + cache.put(newReference, blockId); + return newReference; + } + + @Override + public void editReference(long reference, long blockId) throws IOException { + cache.put(reference, blockId); + } + + @Override + public void close() throws IOException { + cache.close(); + metaFileChannel.close(); + } + + @Override + public long getFirstFreeReference() { + return firstFreeReference; + } + + private Future writeReferenceToDisk(long reference, long blockId) { + ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); + data.putLong(blockId); + data.flip(); + return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT); + } + +} diff --git a/src/main/java/it/cavallium/strangedb/database/references/ b/src/main/java/it/cavallium/strangedb/database/references/ new file mode 100644 index 0000000..e31cdae --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/references/ @@ -0,0 +1,111 @@ +package it.cavallium.strangedb.database.references; + +import it.unimi.dsi.fastutil.longs.*; +import it.unimi.dsi.fastutil.objects.ObjectIterator; + +import; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; + +public class DatabaseReferencesMetadataCache { + + private static final int GOOD_CACHE_SIZE = 70000; + private static final int MAX_CACHE_SIZE = 100000; + + private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE); + private final Object readAccessLock = new Object(); + private final Object writeAccessLock = new Object(); + private final DatabaseReferencesMetadataCacheFlusher flusher; + private volatile boolean closed; + + public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { + this.flusher = flusher; + } + + public long get(long reference) throws IOException { + synchronized (readAccessLock) { + if (closed) throw new IOException("Cache already closed!"); + return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); + } + } + + public void put(long reference, long blockId) throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + if (closed) return; + references2Blocks.put(reference, blockId); + } + } + this.flush(); + } + + private void flush() throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + if (closed) return; + int references2BlocksSize = references2Blocks.size(); + if (references2BlocksSize > MAX_CACHE_SIZE) { + synchronized (writeAccessLock) { + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (references2BlocksSize > GOOD_CACHE_SIZE) { + Long2LongMap.Entry entry =; + entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); + entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } + references2BlocksSize--; + } + executeAsyncFlush(entriesToFlush); + } + } + } + } + } + + public void close() throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + if (!closed) { + closed = true; + int references2BlocksSize = references2Blocks.size(); + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (references2BlocksSize > 0) { + Long2LongMap.Entry entry =; + entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); + entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } + references2BlocksSize--; + } + executeAsyncFlush(entriesToFlush); + } + } + } + } + + private void executeAsyncFlush(List> entriesToFlush) throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + entriesToFlush.clear(); + } + } + } + } +} diff --git a/src/main/java/it/cavallium/strangedb/database/references/ b/src/main/java/it/cavallium/strangedb/database/references/ new file mode 100644 index 0000000..7c5f231 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/database/references/ @@ -0,0 +1,8 @@ +package it.cavallium.strangedb.database.references; + +import; +import java.util.concurrent.Future; + +public interface DatabaseReferencesMetadataCacheFlusher { + Future flush(long key, long value) throws IOException; +} diff --git a/src/main/java/it/cavallium/strangedb/functionalinterfaces/ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ new file mode 100644 index 0000000..6e82578 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ @@ -0,0 +1,15 @@ +package it.cavallium.strangedb.functionalinterfaces; + +import; +import java.util.Objects; + +@FunctionalInterface +public interface ConsumerWithIO { + + void accept(T t) throws IOException; + + default ConsumerWithIO andThen(ConsumerWithIO after) { + Objects.requireNonNull(after); + return (T t) -> { accept(t); after.accept(t); }; + } +} diff --git a/src/main/java/it/cavallium/strangedb/functionalinterfaces/ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ new file mode 100644 index 0000000..ae814c9 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ @@ -0,0 +1,78 @@ +package it.cavallium.strangedb.functionalinterfaces; + +import; +import java.util.Objects; + +/** + * Represents a function that accepts one argument and produces a result. + * + *

This is a functional interface + * whose functional method is {@link #apply(Object)}. + * + * @param the type of the input to the function + * @param the type of the result of the function + * + * @since 1.8 + */ +@FunctionalInterface +public interface FunctionWithIO { + + /** + * Applies this function to the given argument. + * + * @param t the function argument + * @return the function result + */ + R apply(T t) throws IOException; + + /** + * Returns a composed function that first applies the {@code before} + * function to its input, and then applies this function to the result. + * If evaluation of either function throws an exception, it is relayed to + * the caller of the composed function. + * + * @param the type of input to the {@code before} function, and to the + * composed function + * @param before the function to apply before this function is applied + * @return a composed function that first applies the {@code before} + * function and then applies this function + * @throws NullPointerException if before is null + * + * @see #andThen(FunctionWithIO) + */ + default FunctionWithIO compose(FunctionWithIO before) { + Objects.requireNonNull(before); + return (V v) -> apply(before.apply(v)); + } + + /** + * Returns a composed function that first applies this function to + * its input, and then applies the {@code after} function to the result. + * If evaluation of either function throws an exception, it is relayed to + * the caller of the composed function. + * + * @param the type of output of the {@code after} function, and of the + * composed function + * @param after the function to apply after this function is applied + * @return a composed function that first applies this function and then + * applies the {@code after} function + * @throws NullPointerException if after is null + * + * @see #compose(FunctionWithIO) + */ + default FunctionWithIO andThen(FunctionWithIO after) { + Objects.requireNonNull(after); + return (T t) -> after.apply(apply(t)); + } + + /** + * Returns a function that always returns its input argument. + * + * @param the type of the input and output objects to the function + * @return a function that always returns its input argument + */ + static FunctionWithIO identity() { + return t -> t; + } +} + diff --git a/src/main/java/it/cavallium/strangedb/functionalinterfaces/ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ new file mode 100644 index 0000000..2061f99 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ @@ -0,0 +1,5 @@ +package it.cavallium.strangedb.functionalinterfaces; + +public interface Long2LongConsumer { + void accept(long a, long b); +} diff --git a/src/main/java/it/cavallium/strangedb/functionalinterfaces/ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ new file mode 100644 index 0000000..1cbc30e --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ @@ -0,0 +1,19 @@ +package it.cavallium.strangedb.functionalinterfaces; + +import; + +@FunctionalInterface +public interface RunnableWithIO { + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see java.lang.Thread#run() + */ + public abstract void run() throws IOException; +} diff --git a/src/main/java/it/cavallium/strangedb/functionalinterfaces/ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ new file mode 100644 index 0000000..d38a8f9 --- /dev/null +++ b/src/main/java/it/cavallium/strangedb/functionalinterfaces/ @@ -0,0 +1,8 @@ +package it.cavallium.strangedb.functionalinterfaces; + +import; + +@FunctionalInterface +public interface SupplierWithIO { + public T getWithIO() throws IOException; +} \ No newline at end of file