Async metadata

This commit is contained in:
Andrea Cavalli 2019-03-03 00:54:47 +01:00
parent ba103ddde1
commit d4599ca495
8 changed files with 318 additions and 25 deletions

View File

@ -4,6 +4,7 @@ import java.io.IOException;
public interface IBlocksMetadata {
long EMPTY_BLOCK_ID = -1;
long ERROR_BLOCK_ID = -2;
BlockInfo EMPTY_BLOCK_INFO = new BlockInfo(0, 0);
/**

View File

@ -6,19 +6,24 @@ import org.warp.cowdb.IBlocksMetadata;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class DatabaseBlocksMetadata implements IBlocksMetadata {
private final SeekableByteChannel metaFileChannel;
public static final BlockInfo ERROR_BLOCK_INFO = new BlockInfo(-2, 0);
private final AsynchronousFileChannel metaFileChannel;
private final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES;
private final DatabaseBlocksMetadataCache cache;
private long firstFreeBlock;
public DatabaseBlocksMetadata(Path metaFile) throws IOException {
metaFileChannel = Files.newByteChannel(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
metaFileChannel = AsynchronousFileChannel.open(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
firstFreeBlock = metaFileChannel.size() / BLOCK_META_BYTES_COUNT;
this.cache = new DatabaseBlocksMetadataCache(this::writeBlockToDisk);
}
@Override
@ -26,27 +31,45 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
if (blockId == EMPTY_BLOCK_ID) {
return EMPTY_BLOCK_INFO;
}
BlockInfo blockInfo;
if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) {
return blockInfo;
}
ByteBuffer buffer = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT);
metaFileChannel.position(blockId * BLOCK_META_BYTES_COUNT).read(buffer);
try {
metaFileChannel.read(buffer, blockId * BLOCK_META_BYTES_COUNT).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
buffer.flip();
long index = buffer.getLong();
int size = buffer.getInt();
return new BlockInfo(index, size);
blockInfo = new BlockInfo(index, size);
cache.put(index, blockInfo);
return blockInfo;
}
@Override
public long newBlock(long index, int size) throws IOException {
long newBlockId = firstFreeBlock++;
ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT);
data.putLong(index);
data.putInt(size);
data.flip();
metaFileChannel.position(newBlockId * BLOCK_META_BYTES_COUNT).write(data);
BlockInfo blockInfo = new BlockInfo(index, size);
cache.put(newBlockId, blockInfo);
return newBlockId;
}
@Override
public void close() throws IOException {
cache.close();
metaFileChannel.close();
}
private Future<Integer> writeBlockToDisk(long blockId, long index, int size) {
ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT);
data.putLong(index);
data.putInt(size);
data.flip();
return metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT);
}
}

View File

@ -0,0 +1,113 @@
package org.warp.cowdb.database;
import it.unimi.dsi.fastutil.longs.Long2LongLinkedOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2LongMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import org.warp.cowdb.BlockInfo;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.warp.cowdb.database.DatabaseBlocksMetadata.ERROR_BLOCK_INFO;
public class DatabaseBlocksMetadataCache {
private static final int GOOD_CACHE_SIZE = 70000;
private static final int MAX_CACHE_SIZE = 100000;
private final Long2ObjectMap<BlockInfo> blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE);
private final Object readAccessLock = new Object();
private final Object writeAccessLock = new Object();
private final DatabaseBlocksMetadataCacheFlusher flusher;
private volatile boolean closed;
public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) {
this.flusher = flusher;
}
public BlockInfo get(long block) throws IOException {
if (closed) throw new IOException("Cache already closed!");
synchronized (readAccessLock) {
return blocks2Info.getOrDefault(block, ERROR_BLOCK_INFO);
}
}
public void put(long block, BlockInfo value) throws IOException {
if (closed) return;
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
blocks2Info.put(block, value);
}
}
this.flush();
}
public void remove(long block) {
if (closed) return;
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
blocks2Info.remove(block);
}
}
}
private void flush() throws IOException {
if (closed) return;
int blocks2InfoSize = blocks2Info.size();
if (blocks2InfoSize > MAX_CACHE_SIZE) {
synchronized (writeAccessLock) {
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>();
while (blocks2InfoSize > GOOD_CACHE_SIZE) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
entriesIterator.remove();
blocks2InfoSize--;
}
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
}
}
public void close() throws IOException {
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
if (!closed) {
closed = true;
int blocks2InfoSize = blocks2Info.size();
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>();
while (blocks2InfoSize > 0) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
blocks2InfoSize--;
}
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
}
}
}
}

View File

@ -0,0 +1,10 @@
package org.warp.cowdb.database;
import org.warp.cowdb.BlockInfo;
import java.io.IOException;
import java.util.concurrent.Future;
public interface DatabaseBlocksMetadataCacheFlusher {
Future<Integer> flush(long key, long value1, int value2) throws IOException;
}

View File

@ -1,61 +1,77 @@
package org.warp.cowdb.database;
import it.unimi.dsi.fastutil.bytes.ByteLinkedOpenCustomHashSet;
import org.warp.cowdb.IReferencesMetadata;
import org.warp.jcwdb.ann.DBClass;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.warp.cowdb.IBlocksMetadata.EMPTY_BLOCK_ID;
import static org.warp.cowdb.IBlocksMetadata.ERROR_BLOCK_ID;
public class DatabaseReferencesMetadata implements IReferencesMetadata {
private final SeekableByteChannel metaFileChannel;
private final AsynchronousFileChannel metaFileChannel;
private final int REF_META_BYTES_COUNT = Long.BYTES;
private final DatabaseReferencesMetadataCache cache;
private long firstFreeReference;
public DatabaseReferencesMetadata(Path refMetaFile) throws IOException {
metaFileChannel = Files.newByteChannel(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT;
this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk);
}
@Override
public long getReference(long reference) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(REF_META_BYTES_COUNT);
if (reference >= firstFreeReference) {
return EMPTY_BLOCK_ID;
}
SeekableByteChannel currentFileChannel = metaFileChannel.position(reference * REF_META_BYTES_COUNT);
currentFileChannel.read(buffer);
long block;
if ((block = cache.get(reference)) != ERROR_BLOCK_ID) {
return block;
}
ByteBuffer buffer = ByteBuffer.allocate(REF_META_BYTES_COUNT);
try {
metaFileChannel.read(buffer, reference * REF_META_BYTES_COUNT).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
buffer.flip();
long block = buffer.getLong();
if (buffer.limit() == 0 || block == 0xFFFFFFFFFFFFFFFFL) {
block = buffer.getLong();
if (buffer.limit() != 0 && block != 0xFFFFFFFFFFFFFFFFL) {
cache.put(reference, block);
return block;
} else {
cache.put(reference, EMPTY_BLOCK_ID);
return EMPTY_BLOCK_ID;
}
return block;
}
@Override
public long newReference(long blockId) throws IOException {
long newReference = firstFreeReference++;
editReference(newReference, blockId);
cache.put(newReference, blockId);
return newReference;
}
@Override
public void editReference(long reference, long blockId) throws IOException {
ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT);
data.putLong(blockId);
SeekableByteChannel currentFileChannel = metaFileChannel.position(reference * REF_META_BYTES_COUNT);
data.flip();
currentFileChannel.write(data);
cache.put(reference, blockId);
}
@Override
public void close() throws IOException {
cache.close();
metaFileChannel.close();
}
@ -63,4 +79,12 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
public long getFirstFreeReference() {
return firstFreeReference;
}
private Future<Integer> writeReferenceToDisk(long reference, long blockId) {
ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT);
data.putLong(blockId);
data.flip();
return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT);
}
}

View File

@ -0,0 +1,107 @@
package org.warp.cowdb.database;
import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.warp.cowdb.IBlocksMetadata.ERROR_BLOCK_ID;
public class DatabaseReferencesMetadataCache {
private static final int GOOD_CACHE_SIZE = 70000;
private static final int MAX_CACHE_SIZE = 100000;
private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE);
private final Object readAccessLock = new Object();
private final Object writeAccessLock = new Object();
private final DatabaseReferencesMetadataCacheFlusher flusher;
private volatile boolean closed;
public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) {
this.flusher = flusher;
}
public long get(long reference) throws IOException {
if (closed) throw new IOException("Cache already closed!");
synchronized (readAccessLock) {
return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID);
}
}
public void put(long reference, long value) throws IOException {
if (closed) return;
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
references2Blocks.put(reference, value);
}
}
this.flush();
}
public void remove(long reference) {
if (closed) return;
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
references2Blocks.remove(reference);
}
}
}
private void flush() throws IOException {
if (closed) return;
int references2BlocksSize = references2Blocks.size();
if (references2BlocksSize > MAX_CACHE_SIZE) {
synchronized (writeAccessLock) {
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>();
while (references2BlocksSize > GOOD_CACHE_SIZE) {
Long2LongMap.Entry entry = entriesIterator.next();
entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue()));
entriesIterator.remove();
references2BlocksSize--;
}
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
}
}
public void close() throws IOException {
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
if (!closed) {
closed = true;
int references2BlocksSize = references2Blocks.size();
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>();
while (references2BlocksSize > 0) {
Long2LongMap.Entry entry = entriesIterator.next();
entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue()));
references2BlocksSize--;
}
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
}
}
}
}

View File

@ -0,0 +1,8 @@
package org.warp.cowdb.database;
import java.io.IOException;
import java.util.concurrent.Future;
public interface DatabaseReferencesMetadataCacheFlusher {
Future<Integer> flush(long key, long value) throws IOException;
}

View File

@ -0,0 +1,7 @@
package org.warp.cowdb.database;
import java.io.IOException;
public interface Long2LongConsumer {
void accept(long a, long b);
}