From d4599ca4952b2ca83b1c3c57eaa290347c29f475 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 3 Mar 2019 00:54:47 +0100 Subject: [PATCH] Async metadata --- .../java/org/warp/cowdb/IBlocksMetadata.java | 1 + .../database/DatabaseBlocksMetadata.java | 45 +++++-- .../database/DatabaseBlocksMetadataCache.java | 113 ++++++++++++++++++ .../DatabaseBlocksMetadataCacheFlusher.java | 10 ++ .../database/DatabaseReferencesMetadata.java | 52 +++++--- .../DatabaseReferencesMetadataCache.java | 107 +++++++++++++++++ ...atabaseReferencesMetadataCacheFlusher.java | 8 ++ .../cowdb/database/Long2LongConsumer.java | 7 ++ 8 files changed, 318 insertions(+), 25 deletions(-) create mode 100644 src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java create mode 100644 src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCacheFlusher.java create mode 100644 src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java create mode 100644 src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCacheFlusher.java create mode 100644 src/main/java/org/warp/cowdb/database/Long2LongConsumer.java diff --git a/src/main/java/org/warp/cowdb/IBlocksMetadata.java b/src/main/java/org/warp/cowdb/IBlocksMetadata.java index 12a6443..31c0dea 100644 --- a/src/main/java/org/warp/cowdb/IBlocksMetadata.java +++ b/src/main/java/org/warp/cowdb/IBlocksMetadata.java @@ -4,6 +4,7 @@ import java.io.IOException; public interface IBlocksMetadata { long EMPTY_BLOCK_ID = -1; + long ERROR_BLOCK_ID = -2; BlockInfo EMPTY_BLOCK_INFO = new BlockInfo(0, 0); /** diff --git a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java index 7981dac..31e2570 100644 --- a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java +++ b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java @@ -6,19 +6,24 @@ import org.warp.cowdb.IBlocksMetadata; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SeekableByteChannel; -import java.nio.file.Files; +import java.nio.channels.AsynchronousFileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; public class DatabaseBlocksMetadata implements IBlocksMetadata { - private final SeekableByteChannel metaFileChannel; + public static final BlockInfo ERROR_BLOCK_INFO = new BlockInfo(-2, 0); + + private final AsynchronousFileChannel metaFileChannel; private final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES; + private final DatabaseBlocksMetadataCache cache; private long firstFreeBlock; public DatabaseBlocksMetadata(Path metaFile) throws IOException { - metaFileChannel = Files.newByteChannel(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); + metaFileChannel = AsynchronousFileChannel.open(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); firstFreeBlock = metaFileChannel.size() / BLOCK_META_BYTES_COUNT; + this.cache = new DatabaseBlocksMetadataCache(this::writeBlockToDisk); } @Override @@ -26,27 +31,45 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { if (blockId == EMPTY_BLOCK_ID) { return EMPTY_BLOCK_INFO; } + BlockInfo blockInfo; + if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) { + return blockInfo; + } ByteBuffer buffer = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); - metaFileChannel.position(blockId * BLOCK_META_BYTES_COUNT).read(buffer); + try { + metaFileChannel.read(buffer, blockId * BLOCK_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } buffer.flip(); long index = buffer.getLong(); int size = buffer.getInt(); - return new BlockInfo(index, size); + blockInfo = new BlockInfo(index, size); + cache.put(index, blockInfo); + return blockInfo; } @Override public long newBlock(long index, int size) throws IOException { long newBlockId = firstFreeBlock++; - ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); - data.putLong(index); - data.putInt(size); - data.flip(); - metaFileChannel.position(newBlockId * BLOCK_META_BYTES_COUNT).write(data); + BlockInfo blockInfo = new BlockInfo(index, size); + cache.put(newBlockId, blockInfo); return newBlockId; } @Override public void close() throws IOException { + cache.close(); metaFileChannel.close(); } + + private Future writeBlockToDisk(long blockId, long index, int size) { + ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); + data.putLong(index); + data.putInt(size); + data.flip(); + return metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT); + } } diff --git a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java new file mode 100644 index 0000000..3250cdc --- /dev/null +++ b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java @@ -0,0 +1,113 @@ +package org.warp.cowdb.database; + +import it.unimi.dsi.fastutil.longs.Long2LongLinkedOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2LongMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.objects.ObjectIterator; +import org.warp.cowdb.BlockInfo; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.warp.cowdb.database.DatabaseBlocksMetadata.ERROR_BLOCK_INFO; + +public class DatabaseBlocksMetadataCache { + + private static final int GOOD_CACHE_SIZE = 70000; + private static final int MAX_CACHE_SIZE = 100000; + + private final Long2ObjectMap blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE); + private final Object readAccessLock = new Object(); + private final Object writeAccessLock = new Object(); + private final DatabaseBlocksMetadataCacheFlusher flusher; + private volatile boolean closed; + + public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) { + this.flusher = flusher; + } + + public BlockInfo get(long block) throws IOException { + if (closed) throw new IOException("Cache already closed!"); + synchronized (readAccessLock) { + return blocks2Info.getOrDefault(block, ERROR_BLOCK_INFO); + } + } + + public void put(long block, BlockInfo value) throws IOException { + if (closed) return; + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + blocks2Info.put(block, value); + } + } + this.flush(); + } + + public void remove(long block) { + if (closed) return; + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + blocks2Info.remove(block); + } + } + } + + private void flush() throws IOException { + if (closed) return; + int blocks2InfoSize = blocks2Info.size(); + if (blocks2InfoSize > MAX_CACHE_SIZE) { + synchronized (writeAccessLock) { + ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (blocks2InfoSize > GOOD_CACHE_SIZE) { + Long2ObjectMap.Entry entry = entriesIterator.next(); + BlockInfo blockInfo = entry.getValue(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); + entriesIterator.remove(); + blocks2InfoSize--; + } + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + } + } + + public void close() throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + if (!closed) { + closed = true; + int blocks2InfoSize = blocks2Info.size(); + ObjectIterator> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (blocks2InfoSize > 0) { + Long2ObjectMap.Entry entry = entriesIterator.next(); + BlockInfo blockInfo = entry.getValue(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); + blocks2InfoSize--; + } + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + } + } + } +} diff --git a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCacheFlusher.java b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCacheFlusher.java new file mode 100644 index 0000000..f87ce55 --- /dev/null +++ b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCacheFlusher.java @@ -0,0 +1,10 @@ +package org.warp.cowdb.database; + +import org.warp.cowdb.BlockInfo; + +import java.io.IOException; +import java.util.concurrent.Future; + +public interface DatabaseBlocksMetadataCacheFlusher { + Future flush(long key, long value1, int value2) throws IOException; +} diff --git a/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadata.java b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadata.java index 09ee1d1..b6aabe8 100644 --- a/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadata.java +++ b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadata.java @@ -1,61 +1,77 @@ package org.warp.cowdb.database; +import it.unimi.dsi.fastutil.bytes.ByteLinkedOpenCustomHashSet; import org.warp.cowdb.IReferencesMetadata; import org.warp.jcwdb.ann.DBClass; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import static org.warp.cowdb.IBlocksMetadata.EMPTY_BLOCK_ID; +import static org.warp.cowdb.IBlocksMetadata.ERROR_BLOCK_ID; public class DatabaseReferencesMetadata implements IReferencesMetadata { - private final SeekableByteChannel metaFileChannel; + private final AsynchronousFileChannel metaFileChannel; private final int REF_META_BYTES_COUNT = Long.BYTES; + private final DatabaseReferencesMetadataCache cache; private long firstFreeReference; public DatabaseReferencesMetadata(Path refMetaFile) throws IOException { - metaFileChannel = Files.newByteChannel(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); + metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT; + this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk); } @Override public long getReference(long reference) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(REF_META_BYTES_COUNT); if (reference >= firstFreeReference) { return EMPTY_BLOCK_ID; } - SeekableByteChannel currentFileChannel = metaFileChannel.position(reference * REF_META_BYTES_COUNT); - currentFileChannel.read(buffer); + long block; + if ((block = cache.get(reference)) != ERROR_BLOCK_ID) { + return block; + } + ByteBuffer buffer = ByteBuffer.allocate(REF_META_BYTES_COUNT); + try { + metaFileChannel.read(buffer, reference * REF_META_BYTES_COUNT).get(); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } buffer.flip(); - long block = buffer.getLong(); - if (buffer.limit() == 0 || block == 0xFFFFFFFFFFFFFFFFL) { + block = buffer.getLong(); + if (buffer.limit() != 0 && block != 0xFFFFFFFFFFFFFFFFL) { + cache.put(reference, block); + return block; + } else { + cache.put(reference, EMPTY_BLOCK_ID); return EMPTY_BLOCK_ID; } - return block; } @Override public long newReference(long blockId) throws IOException { long newReference = firstFreeReference++; - editReference(newReference, blockId); + cache.put(newReference, blockId); return newReference; } @Override public void editReference(long reference, long blockId) throws IOException { - ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); - data.putLong(blockId); - SeekableByteChannel currentFileChannel = metaFileChannel.position(reference * REF_META_BYTES_COUNT); - data.flip(); - currentFileChannel.write(data); + cache.put(reference, blockId); } @Override public void close() throws IOException { + cache.close(); metaFileChannel.close(); } @@ -63,4 +79,12 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata { public long getFirstFreeReference() { return firstFreeReference; } + + private Future writeReferenceToDisk(long reference, long blockId) { + ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); + data.putLong(blockId); + data.flip(); + return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT); + } + } diff --git a/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java new file mode 100644 index 0000000..39d6f7a --- /dev/null +++ b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java @@ -0,0 +1,107 @@ +package org.warp.cowdb.database; + +import it.unimi.dsi.fastutil.longs.*; +import it.unimi.dsi.fastutil.objects.ObjectIterator; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static org.warp.cowdb.IBlocksMetadata.ERROR_BLOCK_ID; + +public class DatabaseReferencesMetadataCache { + + private static final int GOOD_CACHE_SIZE = 70000; + private static final int MAX_CACHE_SIZE = 100000; + + private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE); + private final Object readAccessLock = new Object(); + private final Object writeAccessLock = new Object(); + private final DatabaseReferencesMetadataCacheFlusher flusher; + private volatile boolean closed; + + public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { + this.flusher = flusher; + } + + public long get(long reference) throws IOException { + if (closed) throw new IOException("Cache already closed!"); + synchronized (readAccessLock) { + return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); + } + } + + public void put(long reference, long value) throws IOException { + if (closed) return; + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + references2Blocks.put(reference, value); + } + } + this.flush(); + } + + public void remove(long reference) { + if (closed) return; + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + references2Blocks.remove(reference); + } + } + } + + private void flush() throws IOException { + if (closed) return; + int references2BlocksSize = references2Blocks.size(); + if (references2BlocksSize > MAX_CACHE_SIZE) { + synchronized (writeAccessLock) { + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (references2BlocksSize > GOOD_CACHE_SIZE) { + Long2LongMap.Entry entry = entriesIterator.next(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); + entriesIterator.remove(); + references2BlocksSize--; + } + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + } + } + + public void close() throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + if (!closed) { + closed = true; + int references2BlocksSize = references2Blocks.size(); + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (references2BlocksSize > 0) { + Long2LongMap.Entry entry = entriesIterator.next(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); + references2BlocksSize--; + } + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } + } + } + } + } +} diff --git a/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCacheFlusher.java b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCacheFlusher.java new file mode 100644 index 0000000..7a2c519 --- /dev/null +++ b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCacheFlusher.java @@ -0,0 +1,8 @@ +package org.warp.cowdb.database; + +import java.io.IOException; +import java.util.concurrent.Future; + +public interface DatabaseReferencesMetadataCacheFlusher { + Future flush(long key, long value) throws IOException; +} diff --git a/src/main/java/org/warp/cowdb/database/Long2LongConsumer.java b/src/main/java/org/warp/cowdb/database/Long2LongConsumer.java new file mode 100644 index 0000000..ea14c24 --- /dev/null +++ b/src/main/java/org/warp/cowdb/database/Long2LongConsumer.java @@ -0,0 +1,7 @@ +package org.warp.cowdb.database; + +import java.io.IOException; + +public interface Long2LongConsumer { + void accept(long a, long b); +}