Thread safe

This commit is contained in:
Andrea Cavalli 2019-04-20 15:54:40 +02:00
parent 188eabed57
commit b425e4c129
15 changed files with 591 additions and 184 deletions

View File

@ -6,7 +6,7 @@
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>
<artifactId>strangedb-core</artifactId> <artifactId>strangedb-core</artifactId>
<version>1.5.5</version> <version>1.5.7</version>
<name>strangedb-core</name> <name>strangedb-core</name>
<url>https://git.ignuranza.net/andreacavalli/strangedb-core</url> <url>https://git.ignuranza.net/andreacavalli/strangedb-core</url>

View File

@ -4,6 +4,7 @@ import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO;
import it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata; import it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata;
import it.cavallium.strangedb.database.references.DatabaseReferencesIO; import it.cavallium.strangedb.database.references.DatabaseReferencesIO;
import it.cavallium.strangedb.database.references.DatabaseReferencesMetadata; import it.cavallium.strangedb.database.references.DatabaseReferencesMetadata;
import it.cavallium.strangedb.database.references.ReferenceInfo;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -11,16 +12,18 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO;
public class DatabaseCore implements IDatabase { public class DatabaseCore implements IDatabase {
private final DatabaseFileIO fileIO; private final DatabaseFileIO fileIO;
private final DatabaseBlocksIO blocksIO; private final DatabaseBlocksIO blocksIO;
private final DatabaseBlocksMetadata blocksMetadata; protected final DatabaseBlocksMetadata blocksMetadata;
protected final DatabaseReferencesIO referencesIO; protected final DatabaseReferencesIO referencesIO;
protected final DatabaseReferencesMetadata referencesMetadata; protected final DatabaseReferencesMetadata referencesMetadata;
private final Path dataFile; protected final Path dataFile;
private final Path blocksMetaFile; protected final Path blocksMetaFile;
private final Path referencesMetaFile; protected final Path referencesMetaFile;
protected volatile boolean closed; protected volatile boolean closed;
public DatabaseCore(Path dataFile, Path blocksMetaFile, Path referencesMetaFile) throws IOException { public DatabaseCore(Path dataFile, Path blocksMetaFile, Path referencesMetaFile) throws IOException {
@ -82,17 +85,24 @@ public class DatabaseCore implements IDatabase {
long referencesCount = databaseToClean.referencesMetadata.getFirstFreeReference(); long referencesCount = databaseToClean.referencesMetadata.getFirstFreeReference();
long blocksCount = databaseToClean.blocksMetadata.getTotalBlocksCount(); long blocksCount = databaseToClean.blocksMetadata.getTotalBlocksCount();
long writtenReferences = 0; long writtenReferences = 0;
long writtenBlocks = 0;
for (int referenceID = 0; referenceID < referencesCount; referenceID++) { for (int referenceID = 0; referenceID < referencesCount; referenceID++) {
try { try {
ByteBuffer buffer = databaseToClean.referencesIO.readFromReference(referenceID); ReferenceInfo ref = databaseToClean.referencesMetadata.getCleanReference(referenceID);
newDatabase.referencesIO.writeToReference(referenceID, buffer.limit(), buffer); if (!NONEXISTENT_REFERENCE_INFO.equals(ref)) {
writtenReferences++; ByteBuffer buffer = databaseToClean.referencesIO.readFromReference(referenceID);
newDatabase.referencesIO.writeToReference(referenceID, ref.getCleanerId(), buffer.limit(), buffer);
writtenReferences++;
if (buffer.limit() > 0) {
writtenBlocks++;
}
}
} catch (IOException ex) { } catch (IOException ex) {
System.out.println("Error while reading reference " + referenceID + ". References written: " + writtenReferences); System.out.println("Error while reading reference " + referenceID + ". References written: " + writtenReferences);
} }
} }
System.out.println("References written: " + writtenReferences + ". Removed " + (blocksCount - writtenReferences) + " blocks. Removed " + (referencesCount - writtenReferences) + " references."); System.out.println("[Core Cleaner] References written: " + writtenReferences + ". Removed " + (blocksCount - writtenBlocks) + " blocks. Removed " + (referencesCount - writtenReferences) + " references.");
databaseToClean.close(); databaseToClean.close();
newDatabase.close(); newDatabase.close();
Files.deleteIfExists(newDataFile); Files.deleteIfExists(newDataFile);

View File

@ -3,59 +3,70 @@ package it.cavallium.strangedb.database;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel; import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
public class DatabaseFileIO implements IFileIO { public class DatabaseFileIO implements IFileIO {
private final SeekableByteChannel dataFileChannel; private final AsynchronousFileChannel dataFileChannel;
private final Object dataAccessLock = new Object(); private final AtomicLong firstFreeIndex;
private long firstFreeIndex; private volatile boolean closed = false;
public DatabaseFileIO(Path dataFile) throws IOException { public DatabaseFileIO(Path dataFile) throws IOException {
synchronized (dataAccessLock) { dataFileChannel = AsynchronousFileChannel.open(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); firstFreeIndex = new AtomicLong(dataFileChannel.size());
firstFreeIndex = dataFileChannel.size();
}
} }
@Override @Override
public ByteBuffer readAt(long index, int length) throws IOException { public ByteBuffer readAt(long index, int length) throws IOException {
if (closed) throw new IOException("Database closed!");
ByteBuffer dataBuffer = ByteBuffer.allocate(length); ByteBuffer dataBuffer = ByteBuffer.allocate(length);
dataFileChannel.position(index).read(dataBuffer); try {
dataFileChannel.read(dataBuffer, index).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
dataBuffer.flip(); dataBuffer.flip();
return dataBuffer; return dataBuffer;
} }
@Override @Override
public void writeAt(long index, int length, ByteBuffer data) throws IOException { public int writeAt(long index, int length, ByteBuffer data) throws IOException {
synchronized (dataAccessLock) { if (closed) throw new IOException("Database closed!");
if (data.position() != 0) { return writeAt_(index, length, data);
throw new IOException("You didn't flip the ByteBuffer!"); }
}
if (firstFreeIndex < index + length) { private int writeAt_(long index, int length, ByteBuffer data) throws IOException {
firstFreeIndex = index + length; if (data.position() != 0) {
} throw new IOException("You didn't flip the ByteBuffer!");
dataFileChannel.position(index).write(data); }
firstFreeIndex.updateAndGet((firstFreeIndex) -> firstFreeIndex < index + length ? index + length : firstFreeIndex);
try {
return dataFileChannel.write(data, index).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} }
} }
@Override @Override
public long writeAtEnd(int length, ByteBuffer data) throws IOException { public long writeAtEnd(int length, ByteBuffer data) throws IOException {
synchronized (dataAccessLock) { if (closed) throw new IOException("Database closed!");
long index = firstFreeIndex; long index = firstFreeIndex.getAndAdd(length);
firstFreeIndex += length; writeAt_(index, length, data);
writeAt(index, length, data); return index;
return index;
}
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
synchronized (dataAccessLock) { if (closed) throw new IOException("Database already closed!");
dataFileChannel.close(); closed = true;
} dataFileChannel.close();
} }
} }

View File

@ -4,6 +4,8 @@ import java.io.IOException;
public interface IDatabase { public interface IDatabase {
int DISK_BLOCK_SIZE = 4096;
void close() throws IOException; void close() throws IOException;
boolean isClosed(); boolean isClosed();

View File

@ -18,7 +18,7 @@ public interface IFileIO {
* @param length length * @param length length
* @param data bytes * @param data bytes
*/ */
void writeAt(long index, int length, ByteBuffer data) throws IOException; int writeAt(long index, int length, ByteBuffer data) throws IOException;
/** /**
* Write *length* bytes in position *index* * Write *length* bytes in position *index*

View File

@ -21,10 +21,11 @@ public interface IReferencesIO {
/** /**
* Write some data to the reference * Write some data to the reference
* @param reference reference * @param reference reference
* @param cleanerId cleaner id
* @param size data size * @param size data size
* @param data bytes * @param data bytes
*/ */
void writeToReference(long reference, int size, ByteBuffer data) throws IOException; void writeToReference(long reference, byte cleanerId, int size, ByteBuffer data) throws IOException;
/** /**
* Read data from the reference * Read data from the reference

View File

@ -1,5 +1,7 @@
package it.cavallium.strangedb.database; package it.cavallium.strangedb.database;
import it.cavallium.strangedb.database.references.ReferenceInfo;
import java.io.IOException; import java.io.IOException;
public interface IReferencesMetadata { public interface IReferencesMetadata {
@ -8,21 +10,36 @@ public interface IReferencesMetadata {
* @param reference reference * @param reference reference
* @return block id * @return block id
*/ */
long getReference(long reference) throws IOException; long getReferenceBlockId(long reference) throws IOException;
/**
* Get reference info
* @param reference reference
* @return reference info
*/
ReferenceInfo getCleanReference(long reference) throws IOException;
/** /**
* Allocate a block for a new reference * Allocate a block for a new reference
*
* @param blockId block id * @param blockId block id
* @return reference * @return reference
*/ */
long newReference(long blockId) throws IOException; long newReference(long blockId) throws IOException;
/** /**
* Change reference size * Change reference
* @param reference reference * @param reference reference
* @param cleanerId cleaner id
* @param blockId block id * @param blockId block id
*/ */
void editReference(long reference, long blockId) throws IOException; void editReference(long reference, byte cleanerId, long blockId) throws IOException;
/**
* Delete reference
* @param reference reference
*/
void deleteReference(long reference) throws IOException;
/** /**
* Close file * Close file

View File

@ -6,6 +6,11 @@ import it.cavallium.strangedb.database.IBlocksMetadata;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID;

View File

@ -4,6 +4,7 @@ package it.cavallium.strangedb.database.blocks;
import it.cavallium.strangedb.database.IBlocksMetadata; import it.cavallium.strangedb.database.IBlocksMetadata;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path; import java.nio.file.Path;
@ -11,11 +12,14 @@ import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE;
public class DatabaseBlocksMetadata implements IBlocksMetadata { public class DatabaseBlocksMetadata implements IBlocksMetadata {
public static final BlockInfo ERROR_BLOCK_INFO = new BlockInfo(-2, 0); public static final BlockInfo ERROR_BLOCK_INFO = new BlockInfo(-2, 0);
private static final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES;
public static final int BLOCK_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % BLOCK_META_BYTES_COUNT) / BLOCK_META_BYTES_COUNT;
private final AsynchronousFileChannel metaFileChannel; private final AsynchronousFileChannel metaFileChannel;
private final int BLOCK_META_BYTES_COUNT = Long.BYTES + Integer.BYTES;
private final DatabaseBlocksMetadataCache cache; private final DatabaseBlocksMetadataCache cache;
private long firstFreeBlock; private long firstFreeBlock;
@ -34,19 +38,48 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) { if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) {
return blockInfo; return blockInfo;
} }
ByteBuffer buffer = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); long position = blockId * BLOCK_META_BYTES_COUNT;
int size = BLOCK_META_READS_AT_EVERY_READ * BLOCK_META_BYTES_COUNT;
if (blockId + (size - 1) / BLOCK_META_BYTES_COUNT >= firstFreeBlock) {
size = (int) ((firstFreeBlock - blockId) * BLOCK_META_BYTES_COUNT);
}
int blocksCount = size / BLOCK_META_BYTES_COUNT;
ByteBuffer buffer = ByteBuffer.allocate(size);
try { try {
metaFileChannel.read(buffer, blockId * BLOCK_META_BYTES_COUNT).get(); metaFileChannel.read(buffer, position).get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException(e); throw new IOException(e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new IOException(e.getCause()); throw new IOException(e.getCause());
} }
buffer.flip(); buffer.flip();
long index = buffer.getLong();
int size = buffer.getInt(); if (blocksCount < 1) {
blockInfo = new BlockInfo(index, size); throw new IOException("Trying to read <1 blocks");
cache.put(blockId, blockInfo); }
if (buffer.limit() % BLOCK_META_BYTES_COUNT != 0 || buffer.limit() < BLOCK_META_BYTES_COUNT) {
throw new IOException("The buffer is smaller than the data requested.");
} else if (buffer.limit() != size) {
size = buffer.limit();
blocksCount = size / BLOCK_META_BYTES_COUNT;
}
long[] allBlockIds = new long[blocksCount];
BlockInfo[] allBlockInfo = new BlockInfo[blocksCount];
for (int delta = 0; delta < blocksCount; delta++) {
long blockToLoad = blockId + delta;
long blockIndex = buffer.getLong();
int blockSize = buffer.getInt();
BlockInfo currentBlockInfo = new BlockInfo(blockIndex, blockSize);
allBlockIds[delta] = blockToLoad;
allBlockInfo[delta] = currentBlockInfo;
if (blockToLoad == blockId) {
blockInfo = currentBlockInfo;
}
}
cache.putAll(allBlockIds, allBlockInfo);
return blockInfo; return blockInfo;
} }

View File

@ -1,25 +1,31 @@
package it.cavallium.strangedb.database.blocks; package it.cavallium.strangedb.database.blocks;
import it.unimi.dsi.fastutil.longs.Long2ObjectLinkedOpenHashMap; import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator; import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.*;
import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ;
public class DatabaseBlocksMetadataCache { public class DatabaseBlocksMetadataCache {
private static final int GOOD_CACHE_SIZE = 70000; private static final int BASE_QUANTITY = (BLOCK_META_READS_AT_EVERY_READ < 500 ? BLOCK_META_READS_AT_EVERY_READ : 500);
private static final int MAX_CACHE_SIZE = 100000; private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY;
private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ;
private static final int MAX_CACHE_SIZE = 400 * BLOCK_META_READS_AT_EVERY_READ;
private final Long2ObjectMap<BlockInfo> blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE); private final Long2ObjectMap<BlockInfo> blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f);
private final Object readAccessLock = new Object();
private final Object writeAccessLock = new Object(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
private final DatabaseBlocksMetadataCacheFlusher flusher; private final DatabaseBlocksMetadataCacheFlusher flusher;
private volatile boolean closed; private volatile boolean closed;
private ExecutorService flushService = Executors.newSingleThreadExecutor();
private volatile boolean flushing;
public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) { public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) {
this.flusher = flusher; this.flusher = flusher;
@ -27,75 +33,132 @@ public class DatabaseBlocksMetadataCache {
public BlockInfo get(long block) throws IOException { public BlockInfo get(long block) throws IOException {
if (closed) throw new IOException("Cache already closed!"); if (closed) throw new IOException("Cache already closed!");
synchronized (readAccessLock) { lock.readLock().lock();
try {
return blocks2Info.getOrDefault(block, DatabaseBlocksMetadata.ERROR_BLOCK_INFO); return blocks2Info.getOrDefault(block, DatabaseBlocksMetadata.ERROR_BLOCK_INFO);
} finally {
lock.readLock().unlock();
} }
} }
public void put(long block, BlockInfo blockInfo) throws IOException { public void put(long block, BlockInfo blockInfo) {
if (closed) return; if (closed) return;
synchronized (readAccessLock) { lock.writeLock().lock();
synchronized (writeAccessLock) { try {
blocks2Info.put(block, blockInfo); blocks2Info.put(block, blockInfo);
} } finally {
lock.writeLock().unlock();
flushAsync();
}
}
@SuppressWarnings("unchecked")
public void putAll(long[] blocks, BlockInfo[] blockInfos) {
if (closed) return;
lock.writeLock().lock();
try {
Long2ObjectMap blocksInfosToAdd = new Long2ObjectLinkedOpenHashMap<>(blocks, blockInfos, 0.5f);
blocks2Info.putAll(blocksInfosToAdd);
} finally {
lock.writeLock().unlock();
flushAsync();
}
}
private void flushAsync() {
if (blocks2Info.size() >= FLUSH_CACHE_SIZE && !flushing) {
flushing = true;
flushService.execute(() -> {
try {
flush();
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
});
} }
this.flush();
} }
private void flush() throws IOException { private void flush() throws IOException {
if (closed) return; if (closed) {
flushing = false;
return;
}
int blocks2InfoSize = blocks2Info.size(); int blocks2InfoSize = blocks2Info.size();
if (blocks2InfoSize > MAX_CACHE_SIZE) { if (blocks2InfoSize >= FLUSH_CACHE_SIZE) {
synchronized (writeAccessLock) { lock.writeLock().lock();
try {
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>();
while (blocks2InfoSize > GOOD_CACHE_SIZE) { List<Future<?>> entriesToFlush = new ArrayList<>();
while (blocks2InfoSize >= GOOD_CACHE_SIZE) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next(); Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue(); BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
entriesIterator.remove(); entriesIterator.remove();
if (entriesToFlush.size() >= 1000) { if (entriesToFlush.size() >= 1000) {
executeAsyncFlush(entriesToFlush); awaitFlushWriteEnd(entriesToFlush);
} }
blocks2InfoSize--; blocks2InfoSize--;
} }
executeAsyncFlush(entriesToFlush); awaitFlushWriteEnd(entriesToFlush);
} finally {
flushing = false;
lock.writeLock().unlock();
} }
} else {
flushing = false;
} }
} }
public void close() throws IOException { public void close() throws IOException {
synchronized (readAccessLock) { if (!closed) {
synchronized (writeAccessLock) { closed = true;
if (!closed) { stopFlushService();
closed = true; lock.writeLock().lock();
int blocks2InfoSize = blocks2Info.size(); try {
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); int blocks2InfoSize = blocks2Info.size();
List<Future<?>> entriesToFlush = new LinkedList<>(); ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
while (blocks2InfoSize > 0) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next(); List<Future<?>> entriesToFlush = new LinkedList<>();
BlockInfo blockInfo = entry.getValue(); while (blocks2InfoSize > 0) {
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
entriesIterator.remove(); BlockInfo blockInfo = entry.getValue();
if (entriesToFlush.size() >= 1000) { entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
executeAsyncFlush(entriesToFlush); entriesIterator.remove();
}
blocks2InfoSize--; if (entriesToFlush.size() >= 1000) {
awaitFlushWriteEnd(entriesToFlush);
} }
executeAsyncFlush(entriesToFlush); blocks2InfoSize--;
} }
awaitFlushWriteEnd(entriesToFlush);
} finally {
lock.writeLock().unlock();
} }
} }
} }
private void executeAsyncFlush(List<Future<?>> entriesToFlush) throws IOException { private void stopFlushService() {
flushService.shutdown();
try { flushService.awaitTermination(180, TimeUnit.SECONDS); } catch (InterruptedException e) {}
if (!flushService.isTerminated()) {
flushService.shutdownNow();
}
}
private void awaitFlushWriteEnd(List<Future<?>> entriesToFlush) throws IOException {
try { try {
for (Future<?> entryToFlush : entriesToFlush) { entriesToFlush.parallelStream().forEach((entry) -> {
entryToFlush.get(); try {
} entry.get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException(e); throw new CompletionException(e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new CompletionException(e.getCause());
}
});
} catch (CompletionException e) {
throw new IOException(e.getCause()); throw new IOException(e.getCause());
} finally { } finally {
entriesToFlush.clear(); entriesToFlush.clear();

View File

@ -5,6 +5,8 @@ import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID;
@ -12,6 +14,7 @@ public class DatabaseReferencesIO implements IReferencesIO {
private final DatabaseBlocksIO blocksIO; private final DatabaseBlocksIO blocksIO;
private final DatabaseReferencesMetadata referencesMetadata; private final DatabaseReferencesMetadata referencesMetadata;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
public DatabaseReferencesIO(DatabaseBlocksIO blocksIO, DatabaseReferencesMetadata referencesMetadata) { public DatabaseReferencesIO(DatabaseBlocksIO blocksIO, DatabaseReferencesMetadata referencesMetadata) {
this.blocksIO = blocksIO; this.blocksIO = blocksIO;
@ -30,14 +33,24 @@ public class DatabaseReferencesIO implements IReferencesIO {
} }
@Override @Override
public void writeToReference(long reference, int size, ByteBuffer data) throws IOException { public void writeToReference(long reference, byte cleanerId, int size, ByteBuffer data) throws IOException {
long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data); lock.writeLock().lock();
referencesMetadata.editReference(reference, blockId); try {
long blockId = (size == 0) ? EMPTY_BLOCK_ID : blocksIO.newBlock(size, data);
referencesMetadata.editReference(reference, cleanerId, blockId);
} finally {
lock.writeLock().unlock();
}
} }
@Override @Override
public ByteBuffer readFromReference(long reference) throws IOException { public ByteBuffer readFromReference(long reference) throws IOException {
long blockId = referencesMetadata.getReference(reference); lock.readLock().lock();
return blocksIO.readBlock(blockId); try {
long blockId = referencesMetadata.getReferenceBlockId(reference);
return blocksIO.readBlock(blockId);
} finally {
lock.readLock().unlock();
}
} }
} }

View File

@ -7,79 +7,203 @@ import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID;
import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID;
import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE;
public class DatabaseReferencesMetadata implements IReferencesMetadata { public class DatabaseReferencesMetadata implements IReferencesMetadata {
public static final byte ERRORED_CLEANER = (byte) -1;
public static final byte BLANK_DATA_CLEANER = (byte) -2;
public static final ReferenceInfo NONEXISTENT_REFERENCE_INFO = new ReferenceInfo(ERRORED_CLEANER, ERROR_BLOCK_ID);
private static final int REF_META_BYTES_COUNT = Long.BYTES + Integer.BYTES;
public static final int REF_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % REF_META_BYTES_COUNT) / REF_META_BYTES_COUNT;
private final AsynchronousFileChannel metaFileChannel; private final AsynchronousFileChannel metaFileChannel;
private final int REF_META_BYTES_COUNT = Long.BYTES;
private final DatabaseReferencesMetadataCache cache; private final DatabaseReferencesMetadataCache cache;
private long firstFreeReference; private long firstFreeReference;
private long lastWrittenReference;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
public DatabaseReferencesMetadata(Path refMetaFile) throws IOException { public DatabaseReferencesMetadata(Path refMetaFile) throws IOException {
metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT; firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT;
lastWrittenReference = firstFreeReference - 1;
this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk); this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk);
} }
@Override @Override
public long getReference(long reference) throws IOException { public long getReferenceBlockId(long reference) throws IOException {
lock.readLock().lock();
try {
return getReferenceBlockId_(reference);
} finally {
lock.readLock().unlock();
}
}
private long getReferenceBlockId_(long reference) throws IOException {
if (reference >= firstFreeReference) { if (reference >= firstFreeReference) {
return EMPTY_BLOCK_ID; return EMPTY_BLOCK_ID;
} }
long block; long block;
if ((block = cache.get(reference)) != ERROR_BLOCK_ID) { if ((block = cache.getBlock(reference)) != ERROR_BLOCK_ID) {
return block; return block;
} }
ByteBuffer buffer = ByteBuffer.allocate(REF_META_BYTES_COUNT); long position = reference * REF_META_BYTES_COUNT;
int size = REF_META_READS_AT_EVERY_READ * REF_META_BYTES_COUNT;
if (reference + (size - 1) / REF_META_BYTES_COUNT >= firstFreeReference) {
size = (int) ((firstFreeReference - reference) * REF_META_BYTES_COUNT);
}
int referencesCount = size / REF_META_BYTES_COUNT;
ByteBuffer buffer = ByteBuffer.allocate(size);
try { try {
metaFileChannel.read(buffer, reference * REF_META_BYTES_COUNT).get(); metaFileChannel.read(buffer, position).get();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException(e); throw new IOException(e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new IOException(e.getCause()); throw new IOException(e.getCause());
} }
buffer.flip(); buffer.flip();
block = buffer.getLong();
if (buffer.limit() != 0 && block != 0xFFFFFFFFFFFFFFFFL) { if (referencesCount < 1) {
cache.put(reference, block); throw new IOException("Trying to read <1 references");
return block; }
} else { if (buffer.limit() % REF_META_BYTES_COUNT != 0 || buffer.limit() < REF_META_BYTES_COUNT) {
cache.put(reference, EMPTY_BLOCK_ID); throw new IOException("The buffer is smaller than the data requested.");
return EMPTY_BLOCK_ID; } else if (buffer.limit() != size) {
size = buffer.limit();
referencesCount = size / REF_META_BYTES_COUNT;
}
long[] allReferences = new long[referencesCount];
byte[] allCleaners = new byte[referencesCount];
long[] allBlocks = new long[referencesCount];
block = EMPTY_BLOCK_ID;
for (int delta = 0; delta < referencesCount; delta++) {
long referenceToLoad = reference + delta;
long currentBlock = buffer.getLong();
byte cleanerId = (byte) buffer.getInt();
if (buffer.limit() != 0 && currentBlock != 0xFFFFFFFFFFFFFFFFL) {
allReferences[delta] = referenceToLoad;
allCleaners[delta] = cleanerId;
allBlocks[delta] = currentBlock;
if (referenceToLoad == reference) {
block = currentBlock;
}
} else {
allReferences[delta] = referenceToLoad;
allCleaners[delta] = cleanerId;
allBlocks[delta] = EMPTY_BLOCK_ID;
if (referenceToLoad == reference) {
block = EMPTY_BLOCK_ID;
}
}
}
for (int delta = 0; delta < referencesCount; delta++) {
if (allCleaners[delta] == 0) {
System.out.println("ro");
}
}
cache.putAll(allReferences, allCleaners, allBlocks);
return block;
}
/**
* This method is <b>SLOW</b>! Use this only for the cleaner
* @param reference reference
* @return
* @throws IOException
*/
@Deprecated
@Override
public ReferenceInfo getCleanReference(long reference) throws IOException {
lock.readLock().lock();
try {
getReferenceBlockId_(reference);
return cache.get(reference);
} finally {
lock.readLock().unlock();
} }
} }
@Override @Override
public long newReference(long blockId) throws IOException { public long newReference(long blockId) throws IOException {
long newReference = firstFreeReference++; lock.writeLock().lock();
cache.put(newReference, blockId); try {
return newReference; long newReference = firstFreeReference++;
cache.put(newReference, BLANK_DATA_CLEANER, blockId);
return newReference;
} finally {
lock.writeLock().unlock();
}
} }
@Override @Override
public void editReference(long reference, long blockId) throws IOException { public void editReference(long reference, byte cleanerId, long blockId) throws IOException {
cache.put(reference, blockId); lock.writeLock().lock();
try {
cache.put(reference, cleanerId, blockId);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void deleteReference(long reference) throws IOException {
lock.writeLock().lock();
try {
cache.put(reference, NONEXISTENT_REFERENCE_INFO.getCleanerId(), NONEXISTENT_REFERENCE_INFO.getBlockId());
} finally {
lock.writeLock().unlock();
}
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
cache.close(); lock.writeLock().lock();
metaFileChannel.close(); try {
cache.close();
metaFileChannel.close();
} finally {
lock.writeLock().unlock();
}
} }
@Override @Override
public long getFirstFreeReference() { public long getFirstFreeReference() {
return firstFreeReference; synchronized (lock.readLock()) {
return firstFreeReference;
}
} }
private Future<Integer> writeReferenceToDisk(long reference, long blockId) { private Future<Integer> writeReferenceToDisk(long reference, byte cleanerId, long blockId) {
if (cleanerId == ERRORED_CLEANER) {
return CompletableFuture.failedFuture(new IOException("Passing a cleaner with the id of ERRORED_CLIENT"));
}
ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT);
data.putLong(blockId); data.putLong(blockId);
data.putInt(cleanerId & 0xFF);
data.flip(); data.flip();
while (lastWrittenReference < reference - 1) {
ByteBuffer emptyData = ByteBuffer.allocate(REF_META_BYTES_COUNT);
emptyData.putLong(ERROR_BLOCK_ID);
emptyData.putInt(ERRORED_CLEANER & 0xFF);
emptyData.flip();
metaFileChannel.write(emptyData, ++lastWrittenReference * REF_META_BYTES_COUNT);
}
if (reference > lastWrittenReference) {
lastWrittenReference = reference;
}
return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT); return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT);
} }

View File

@ -4,108 +4,192 @@ import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.objects.ObjectIterator; import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.*;
import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID;
import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ;
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.*;
public class DatabaseReferencesMetadataCache { public class DatabaseReferencesMetadataCache {
private static final int GOOD_CACHE_SIZE = 70000; private static final int BASE_QUANTITY = (REF_META_READS_AT_EVERY_READ < 500 ? REF_META_READS_AT_EVERY_READ : 500);
private static final int MAX_CACHE_SIZE = 100000; private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY;
private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ;
private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY;
private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE); private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f);
private final Object readAccessLock = new Object(); private final Long2ByteMap referencesCleaners = new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f);
private final Object writeAccessLock = new Object(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
private final DatabaseReferencesMetadataCacheFlusher flusher; private final DatabaseReferencesMetadataCacheFlusher flusher;
private volatile boolean closed; private volatile boolean closed;
private ExecutorService flushService = Executors.newSingleThreadExecutor();
private volatile boolean flushing;
public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) {
this.flusher = flusher; this.flusher = flusher;
} }
public long get(long reference) throws IOException { public long getBlock(long reference) throws IOException {
synchronized (readAccessLock) { if (closed) throw new IOException("Cache already closed!");
if (closed) throw new IOException("Cache already closed!"); lock.readLock().lock();
try {
return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID);
} finally {
lock.readLock().unlock();
} }
} }
public void put(long reference, long blockId) throws IOException { public ReferenceInfo get(long reference) throws IOException {
synchronized (readAccessLock) { if (closed) throw new IOException("Cache already closed!");
synchronized (writeAccessLock) { lock.readLock().lock();
if (closed) return; try {
references2Blocks.put(reference, blockId); long blockId = references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID);
if (blockId == ERROR_BLOCK_ID) {
return NONEXISTENT_REFERENCE_INFO;
} }
byte cleanerId = referencesCleaners.get(reference);
return new ReferenceInfo(cleanerId, blockId);
} finally {
lock.readLock().unlock();
}
}
public void put(long reference, byte cleanerId, long blockId) throws IOException {
if (closed) throw new IOException("Cache already closed!");
lock.writeLock().lock();
try {
if (cleanerId == 0) {
throw new IOException("Null cleaner id");
}
references2Blocks.put(reference, blockId);
referencesCleaners.put(reference, cleanerId);
} finally {
lock.writeLock().unlock();
flushAsync();
}
}
public void putAll(long[] references, byte[] cleanerIds, long[] blockIds) throws IOException {
if (closed) throw new IOException("Cache already closed!");
lock.writeLock().lock();
try {
for (int i = 0; i < references.length; i++) {
if (cleanerIds[i] == 0) {
throw new IOException("Null cleaner id");
}
}
Long2LongMap referencesBlocksToAdd = new Long2LongLinkedOpenHashMap(references, blockIds, 0.5f);
Long2ByteMap referencesCleanersToAdd = new Long2ByteLinkedOpenHashMap(references, cleanerIds, 0.5f);
references2Blocks.putAll(referencesBlocksToAdd);
referencesCleaners.putAll(referencesCleanersToAdd);
} finally {
lock.writeLock().unlock();
flushAsync();
}
}
private void flushAsync() {
if (references2Blocks.size() >= FLUSH_CACHE_SIZE && !flushing) {
flushing = true;
flushService.execute(() -> {
try {
flush();
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
});
} }
this.flush();
} }
private void flush() throws IOException { private void flush() throws IOException {
synchronized (readAccessLock) { if (closed) {
synchronized (writeAccessLock) { flushing = false;
if (closed) return; return;
int references2BlocksSize = references2Blocks.size(); }
if (references2BlocksSize > MAX_CACHE_SIZE) { int references2BlocksSize = references2Blocks.size();
synchronized (writeAccessLock) { if (references2BlocksSize >= FLUSH_CACHE_SIZE) {
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator(); lock.writeLock().lock();
List<Future<?>> entriesToFlush = new LinkedList<>(); try {
while (references2BlocksSize > GOOD_CACHE_SIZE) { ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator();
Long2LongMap.Entry entry = entriesIterator.next(); ObjectIterator<Long2ByteMap.Entry> cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator();
entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); List<Future<?>> entriesToFlush = new ArrayList<>();
entriesIterator.remove(); while (references2BlocksSize >= GOOD_CACHE_SIZE) {
if (entriesToFlush.size() >= 1000) { Long2LongMap.Entry entry = entriesIterator.next();
executeAsyncFlush(entriesToFlush); Long2ByteMap.Entry cleaner = cleanersIterator.next();
} entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue()));
references2BlocksSize--; entriesIterator.remove();
} cleanersIterator.remove();
executeAsyncFlush(entriesToFlush); if (entriesToFlush.size() >= 1000) {
awaitFlushWriteEnd(entriesToFlush);
} }
references2BlocksSize--;
} }
awaitFlushWriteEnd(entriesToFlush);
} finally {
flushing = false;
lock.writeLock().unlock();
} }
} else {
flushing = false;
} }
} }
public void close() throws IOException { public void close() throws IOException {
synchronized (readAccessLock) { if (!closed) {
synchronized (writeAccessLock) { closed = true;
if (!closed) { stopFlushService();
closed = true; lock.writeLock().lock();
int references2BlocksSize = references2Blocks.size(); try {
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator(); int references2BlocksSize = references2Blocks.size();
List<Future<?>> entriesToFlush = new LinkedList<>(); ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator();
while (references2BlocksSize > 0) { ObjectIterator<Long2ByteMap.Entry> cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator();
Long2LongMap.Entry entry = entriesIterator.next(); List<Future<?>> entriesToFlush = new LinkedList<>();
entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); while (references2BlocksSize > 0) {
entriesIterator.remove(); Long2LongMap.Entry entry = entriesIterator.next();
if (entriesToFlush.size() >= 1000) { Long2ByteMap.Entry cleaner = cleanersIterator.next();
executeAsyncFlush(entriesToFlush); entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue()));
} entriesIterator.remove();
references2BlocksSize--; cleanersIterator.remove();
if (entriesToFlush.size() >= 1000) {
awaitFlushWriteEnd(entriesToFlush);
} }
executeAsyncFlush(entriesToFlush); references2BlocksSize--;
} }
awaitFlushWriteEnd(entriesToFlush);
} finally {
lock.writeLock().unlock();
} }
} }
} }
private void executeAsyncFlush(List<Future<?>> entriesToFlush) throws IOException { private void stopFlushService() {
synchronized (readAccessLock) { flushService.shutdown();
synchronized (writeAccessLock) { try { flushService.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) {}
if (!flushService.isTerminated()) {
flushService.shutdownNow();
}
}
private void awaitFlushWriteEnd(List<Future<?>> entriesToFlush) throws IOException {
try {
entriesToFlush.parallelStream().forEach((entry) -> {
try { try {
for (Future<?> entryToFlush : entriesToFlush) { entry.get();
entryToFlush.get();
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException(e); throw new CompletionException(e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new IOException(e.getCause()); throw new CompletionException(e.getCause());
} finally {
entriesToFlush.clear();
} }
} });
} catch (CompletionException e) {
throw new IOException(e.getCause());
} finally {
entriesToFlush.clear();
} }
} }
} }

View File

@ -4,5 +4,5 @@ import java.io.IOException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
public interface DatabaseReferencesMetadataCacheFlusher { public interface DatabaseReferencesMetadataCacheFlusher {
Future<Integer> flush(long key, long value) throws IOException; Future<Integer> flush(long reference, byte cleanerId, long block) throws IOException;
} }

View File

@ -0,0 +1,44 @@
package it.cavallium.strangedb.database.references;
import java.util.Objects;
import java.util.StringJoiner;
public class ReferenceInfo {
private final byte cleanerId;
private final long blockId;
public ReferenceInfo(byte cleanerId, long blockId) {
this.cleanerId = cleanerId;
this.blockId = blockId;
}
public byte getCleanerId() {
return cleanerId;
}
public long getBlockId() {
return blockId;
}
@Override
public String toString() {
return new StringJoiner(", ", ReferenceInfo.class.getSimpleName() + "[", "]")
.add("cleanerId=" + cleanerId)
.add("blockId=" + blockId)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReferenceInfo that = (ReferenceInfo) o;
return cleanerId == that.cleanerId &&
blockId == that.blockId;
}
@Override
public int hashCode() {
return Objects.hash(cleanerId, blockId);
}
}