Flush executor

This commit is contained in:
Andrea Cavalli 2019-04-20 22:18:34 +02:00
parent b425e4c129
commit 7321795000
8 changed files with 208 additions and 206 deletions

View File

@ -13,6 +13,7 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID;
import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID;
public class DatabaseBlocksIO implements IBlocksIO { public class DatabaseBlocksIO implements IBlocksIO {
@ -29,6 +30,12 @@ public class DatabaseBlocksIO implements IBlocksIO {
if (size == 0) { if (size == 0) {
return EMPTY_BLOCK_ID; return EMPTY_BLOCK_ID;
} }
if (size < 0) {
throw new IOException("Trying to create a block with size " + size);
}
if (data.limit() < size) {
throw new IOException("Trying to create a block with size " + size + " but with a buffer of size " + data.limit());
}
long index = fileIO.writeAtEnd(size, data); long index = fileIO.writeAtEnd(size, data);
return blocksMetadata.newBlock(index, size); return blocksMetadata.newBlock(index, size);
} }
@ -38,6 +45,12 @@ public class DatabaseBlocksIO implements IBlocksIO {
if (blockId == EMPTY_BLOCK_ID) { if (blockId == EMPTY_BLOCK_ID) {
return ByteBuffer.wrap(new byte[0]); return ByteBuffer.wrap(new byte[0]);
} }
if (blockId == ERROR_BLOCK_ID) {
throw new IOException("Errored block id");
}
if (blockId < 0) {
throw new IOException("Block id " + blockId + " is not valid");
}
BlockInfo blockInfo = blocksMetadata.getBlockInfo(blockId); BlockInfo blockInfo = blocksMetadata.getBlockInfo(blockId);
return fileIO.readAt(blockInfo.getIndex(), blockInfo.getSize()); return fileIO.readAt(blockInfo.getIndex(), blockInfo.getSize());
} }

View File

@ -4,13 +4,12 @@ package it.cavallium.strangedb.database.blocks;
import it.cavallium.strangedb.database.IBlocksMetadata; import it.cavallium.strangedb.database.IBlocksMetadata;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong;
import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE; import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE;
@ -21,11 +20,11 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
private final AsynchronousFileChannel metaFileChannel; private final AsynchronousFileChannel metaFileChannel;
private final DatabaseBlocksMetadataCache cache; private final DatabaseBlocksMetadataCache cache;
private long firstFreeBlock; private AtomicLong firstFreeBlock;
public DatabaseBlocksMetadata(Path metaFile) throws IOException { public DatabaseBlocksMetadata(Path metaFile) throws IOException {
metaFileChannel = AsynchronousFileChannel.open(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metaFileChannel = AsynchronousFileChannel.open(metaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
firstFreeBlock = metaFileChannel.size() / BLOCK_META_BYTES_COUNT; firstFreeBlock = new AtomicLong(metaFileChannel.size() / BLOCK_META_BYTES_COUNT);
this.cache = new DatabaseBlocksMetadataCache(this::writeBlockToDisk); this.cache = new DatabaseBlocksMetadataCache(this::writeBlockToDisk);
} }
@ -34,14 +33,18 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
if (blockId == EMPTY_BLOCK_ID) { if (blockId == EMPTY_BLOCK_ID) {
return EMPTY_BLOCK_INFO; return EMPTY_BLOCK_INFO;
} }
if (blockId == ERROR_BLOCK_ID) {
throw new IOException("Errored block id");
}
BlockInfo blockInfo; BlockInfo blockInfo;
if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) { if ((blockInfo = cache.get(blockId)) != ERROR_BLOCK_INFO) {
return blockInfo; return blockInfo;
} }
long position = blockId * BLOCK_META_BYTES_COUNT; long position = blockId * BLOCK_META_BYTES_COUNT;
int size = BLOCK_META_READS_AT_EVERY_READ * BLOCK_META_BYTES_COUNT; int size = BLOCK_META_READS_AT_EVERY_READ * BLOCK_META_BYTES_COUNT;
if (blockId + (size - 1) / BLOCK_META_BYTES_COUNT >= firstFreeBlock) { long currentFirstFreeBlock = this.firstFreeBlock.get();
size = (int) ((firstFreeBlock - blockId) * BLOCK_META_BYTES_COUNT); if (blockId + (size - 1) / BLOCK_META_BYTES_COUNT >= currentFirstFreeBlock) {
size = (int) ((currentFirstFreeBlock - blockId) * BLOCK_META_BYTES_COUNT);
} }
int blocksCount = size / BLOCK_META_BYTES_COUNT; int blocksCount = size / BLOCK_META_BYTES_COUNT;
@ -68,6 +71,7 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
long[] allBlockIds = new long[blocksCount]; long[] allBlockIds = new long[blocksCount];
BlockInfo[] allBlockInfo = new BlockInfo[blocksCount]; BlockInfo[] allBlockInfo = new BlockInfo[blocksCount];
blockInfo = EMPTY_BLOCK_INFO;
for (int delta = 0; delta < blocksCount; delta++) { for (int delta = 0; delta < blocksCount; delta++) {
long blockToLoad = blockId + delta; long blockToLoad = blockId + delta;
long blockIndex = buffer.getLong(); long blockIndex = buffer.getLong();
@ -88,7 +92,7 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
if (size == 0) { if (size == 0) {
return EMPTY_BLOCK_ID; return EMPTY_BLOCK_ID;
} }
long newBlockId = firstFreeBlock++; long newBlockId = firstFreeBlock.getAndIncrement();
BlockInfo blockInfo = new BlockInfo(index, size); BlockInfo blockInfo = new BlockInfo(index, size);
cache.put(newBlockId, blockInfo); cache.put(newBlockId, blockInfo);
return newBlockId; return newBlockId;
@ -100,16 +104,22 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
metaFileChannel.close(); metaFileChannel.close();
} }
private Future<Integer> writeBlockToDisk(long blockId, long index, int size) { private void writeBlockToDisk(long blockId, long index, int size) throws IOException {
ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT); ByteBuffer data = ByteBuffer.allocate(BLOCK_META_BYTES_COUNT);
data.putLong(index); data.putLong(index);
data.putInt(size); data.putInt(size);
data.flip(); data.flip();
return metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT); try {
metaFileChannel.write(data, blockId * BLOCK_META_BYTES_COUNT).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
} }
@Override @Override
public long getTotalBlocksCount() { public long getTotalBlocksCount() {
return firstFreeBlock; return firstFreeBlock.get();
} }
} }

View File

@ -1,10 +1,12 @@
package it.cavallium.strangedb.database.blocks; package it.cavallium.strangedb.database.blocks;
import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectIterator; import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -19,13 +21,12 @@ public class DatabaseBlocksMetadataCache {
private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ;
private static final int MAX_CACHE_SIZE = 400 * BLOCK_META_READS_AT_EVERY_READ; private static final int MAX_CACHE_SIZE = 400 * BLOCK_META_READS_AT_EVERY_READ;
private final Long2ObjectMap<BlockInfo> blocks2Info = new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f); private final Long2ObjectMap<BlockInfo> blocks2Info = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f));
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
private final DatabaseBlocksMetadataCacheFlusher flusher; private final DatabaseBlocksMetadataCacheFlusher flusher;
private volatile boolean closed; private volatile boolean closed;
private ExecutorService flushService = Executors.newSingleThreadExecutor(); ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "Blocks Flush Thread"));
private volatile boolean flushing;
public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) { public DatabaseBlocksMetadataCache(DatabaseBlocksMetadataCacheFlusher flusher) {
this.flusher = flusher; this.flusher = flusher;
@ -41,127 +42,102 @@ public class DatabaseBlocksMetadataCache {
} }
} }
public void put(long block, BlockInfo blockInfo) { public void put(long block, BlockInfo blockInfo) throws IOException {
if (closed) return; if (closed) return;
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
blocks2Info.put(block, blockInfo); blocks2Info.put(block, blockInfo);
flush();
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
flushAsync();
} }
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void putAll(long[] blocks, BlockInfo[] blockInfos) { public void putAll(long[] blocks, BlockInfo[] blockInfos) throws IOException {
if (closed) return; if (closed) return;
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
Long2ObjectMap blocksInfosToAdd = new Long2ObjectLinkedOpenHashMap<>(blocks, blockInfos, 0.5f); Long2ObjectMap blocksInfosToAdd = new Long2ObjectLinkedOpenHashMap<>(blocks, blockInfos, 0.5f);
blocks2Info.putAll(blocksInfosToAdd); blocks2Info.putAll(blocksInfosToAdd);
flush();
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
flushAsync();
}
}
private void flushAsync() {
if (blocks2Info.size() >= FLUSH_CACHE_SIZE && !flushing) {
flushing = true;
flushService.execute(() -> {
try {
flush();
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
});
} }
} }
private void flush() throws IOException { private void flush() throws IOException {
if (closed) { if (closed) return;
flushing = false;
return;
}
int blocks2InfoSize = blocks2Info.size(); int blocks2InfoSize = blocks2Info.size();
if (blocks2InfoSize >= FLUSH_CACHE_SIZE) { if (blocks2InfoSize >= FLUSH_CACHE_SIZE) {
lock.writeLock().lock(); ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
try { @SuppressWarnings("unchecked")
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); ObjectArrayList<Callable<Void>> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize - GOOD_CACHE_SIZE], blocks2InfoSize - GOOD_CACHE_SIZE);
for (int i = 0; i < blocks2InfoSize - GOOD_CACHE_SIZE; i++) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue();
long blockId = entry.getLongKey();
long blockPosition = blockInfo.getIndex();
int blockSize = blockInfo.getSize();
entriesIterator.remove();
List<Future<?>> entriesToFlush = new ArrayList<>(); tasks.set(i, () -> {
while (blocks2InfoSize >= GOOD_CACHE_SIZE) { try {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next(); flusher.flush(blockId, blockPosition, blockSize);
BlockInfo blockInfo = entry.getValue(); } catch (IOException e) {
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); throw new CompletionException(e);
entriesIterator.remove();
if (entriesToFlush.size() >= 1000) {
awaitFlushWriteEnd(entriesToFlush);
} }
blocks2InfoSize--; return null;
} });
awaitFlushWriteEnd(entriesToFlush); }
} finally { try {
flushing = false; flushExecutorService.invokeAll(tasks);
lock.writeLock().unlock(); } catch (InterruptedException e) {
throw new IOException(e.getCause());
} }
} else {
flushing = false;
} }
} }
public void close() throws IOException { public void close() throws IOException {
if (!closed) { if (!closed) {
closed = true; closed = true;
stopFlushService();
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
int blocks2InfoSize = blocks2Info.size(); int blocks2InfoSize = blocks2Info.size();
ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator(); ObjectIterator<Long2ObjectMap.Entry<BlockInfo>> entriesIterator = blocks2Info.long2ObjectEntrySet().iterator();
@SuppressWarnings("unchecked")
List<Future<?>> entriesToFlush = new LinkedList<>(); ObjectArrayList<Callable<Void>> tasks = ObjectArrayList.wrap(new Callable[blocks2InfoSize], blocks2InfoSize);
while (blocks2InfoSize > 0) { for (int i = 0; i < blocks2InfoSize; i++) {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next(); Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue(); BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); long blockId = entry.getLongKey();
long blockPosition = blockInfo.getIndex();
int blockSize = blockInfo.getSize();
entriesIterator.remove(); entriesIterator.remove();
tasks.set(i, () -> {
if (entriesToFlush.size() >= 1000) { try {
awaitFlushWriteEnd(entriesToFlush); flusher.flush(blockId, blockPosition, blockSize);
} } catch (IOException e) {
blocks2InfoSize--; throw new CompletionException(e);
}
return null;
});
}
try {
flushExecutorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new IOException(e.getCause());
}
flushExecutorService.shutdown();
try {
if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS))
flushExecutorService.shutdownNow();
} catch (InterruptedException e) {
throw new IOException(e);
} }
awaitFlushWriteEnd(entriesToFlush);
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
} }
} }
private void stopFlushService() {
flushService.shutdown();
try { flushService.awaitTermination(180, TimeUnit.SECONDS); } catch (InterruptedException e) {}
if (!flushService.isTerminated()) {
flushService.shutdownNow();
}
}
private void awaitFlushWriteEnd(List<Future<?>> entriesToFlush) throws IOException {
try {
entriesToFlush.parallelStream().forEach((entry) -> {
try {
entry.get();
} catch (InterruptedException e) {
throw new CompletionException(e);
} catch (ExecutionException e) {
throw new CompletionException(e.getCause());
}
});
} catch (CompletionException e) {
throw new IOException(e.getCause());
} finally {
entriesToFlush.clear();
}
}
} }

View File

@ -4,5 +4,5 @@ import java.io.IOException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
public interface DatabaseBlocksMetadataCacheFlusher { public interface DatabaseBlocksMetadataCacheFlusher {
Future<Integer> flush(long key, long value1, int value2) throws IOException; void flush(long key, long value1, int value2) throws IOException;
} }

View File

@ -5,7 +5,6 @@ import it.cavallium.strangedb.database.blocks.DatabaseBlocksIO;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID;
@ -45,12 +44,13 @@ public class DatabaseReferencesIO implements IReferencesIO {
@Override @Override
public ByteBuffer readFromReference(long reference) throws IOException { public ByteBuffer readFromReference(long reference) throws IOException {
long blockId;
lock.readLock().lock(); lock.readLock().lock();
try { try {
long blockId = referencesMetadata.getReferenceBlockId(reference); blockId = referencesMetadata.getReferenceBlockId(reference);
return blocksIO.readBlock(blockId);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
return blocksIO.readBlock(blockId);
} }
} }

View File

@ -7,11 +7,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.EMPTY_BLOCK_ID;
@ -27,14 +26,14 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
private final AsynchronousFileChannel metaFileChannel; private final AsynchronousFileChannel metaFileChannel;
private final DatabaseReferencesMetadataCache cache; private final DatabaseReferencesMetadataCache cache;
private long firstFreeReference; private AtomicLong firstFreeReference;
private long lastWrittenReference; private AtomicLong lastWrittenReference;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
public DatabaseReferencesMetadata(Path refMetaFile) throws IOException { public DatabaseReferencesMetadata(Path refMetaFile) throws IOException {
metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
firstFreeReference = metaFileChannel.size() / REF_META_BYTES_COUNT; firstFreeReference = new AtomicLong(metaFileChannel.size() / REF_META_BYTES_COUNT);
lastWrittenReference = firstFreeReference - 1; lastWrittenReference = new AtomicLong(firstFreeReference.get() - 1);
this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk); this.cache = new DatabaseReferencesMetadataCache(this::writeReferenceToDisk);
} }
@ -49,6 +48,7 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
} }
private long getReferenceBlockId_(long reference) throws IOException { private long getReferenceBlockId_(long reference) throws IOException {
long firstFreeReference = this.firstFreeReference.get();
if (reference >= firstFreeReference) { if (reference >= firstFreeReference) {
return EMPTY_BLOCK_ID; return EMPTY_BLOCK_ID;
} }
@ -108,18 +108,13 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
} }
} }
} }
for (int delta = 0; delta < referencesCount; delta++) {
if (allCleaners[delta] == 0) {
System.out.println("ro");
}
}
cache.putAll(allReferences, allCleaners, allBlocks); cache.putAll(allReferences, allCleaners, allBlocks);
return block; return block;
} }
/** /**
* This method is <b>SLOW</b>! Use this only for the cleaner * This method is <b>SLOW</b>! Use this only for the cleaner
*
* @param reference reference * @param reference reference
* @return * @return
* @throws IOException * @throws IOException
@ -139,13 +134,14 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
@Override @Override
public long newReference(long blockId) throws IOException { public long newReference(long blockId) throws IOException {
lock.writeLock().lock(); lock.writeLock().lock();
long newReference;
try { try {
long newReference = firstFreeReference++; newReference = firstFreeReference.getAndIncrement();
cache.put(newReference, BLANK_DATA_CLEANER, blockId);
return newReference;
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
cache.put(newReference, BLANK_DATA_CLEANER, blockId);
return newReference;
} }
@Override @Override
@ -181,30 +177,55 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
@Override @Override
public long getFirstFreeReference() { public long getFirstFreeReference() {
synchronized (lock.readLock()) { lock.readLock().lock();
return firstFreeReference; try {
return firstFreeReference.get();
} finally {
lock.readLock().unlock();
} }
} }
private Future<Integer> writeReferenceToDisk(long reference, byte cleanerId, long blockId) { private void writeReferenceToDisk(long reference, byte cleanerId, long blockId) throws IOException {
if (cleanerId == ERRORED_CLEANER) { if (cleanerId == ERRORED_CLEANER) {
return CompletableFuture.failedFuture(new IOException("Passing a cleaner with the id of ERRORED_CLIENT")); throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT");
} }
ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT); ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT);
data.putLong(blockId); data.putLong(blockId);
data.putInt(cleanerId & 0xFF); data.putInt(cleanerId & 0xFF);
data.flip(); data.flip();
while (lastWrittenReference < reference - 1) { try {
ByteBuffer emptyData = ByteBuffer.allocate(REF_META_BYTES_COUNT); metaFileChannel.write(data, reference * REF_META_BYTES_COUNT).get();
emptyData.putLong(ERROR_BLOCK_ID); } catch (InterruptedException e) {
emptyData.putInt(ERRORED_CLEANER & 0xFF); throw new IOException(e);
emptyData.flip(); } catch (ExecutionException e) {
metaFileChannel.write(emptyData, ++lastWrittenReference * REF_META_BYTES_COUNT); throw new IOException(e.getCause());
} }
if (reference > lastWrittenReference) {
lastWrittenReference = reference;
}
return metaFileChannel.write(data, reference * REF_META_BYTES_COUNT);
} }
/*
private void writeReferenceToDisk(long reference, byte cleanerId, long blockId) throws IOException {
if (cleanerId == ERRORED_CLEANER) {
throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT");
}
long firstReferenceToWrite = 1 + lastWrittenReference.getAndUpdate((lastWrittenReferenceVal) -> reference > lastWrittenReferenceVal ? reference : lastWrittenReferenceVal);
if (firstReferenceToWrite > reference) {
firstReferenceToWrite = reference;
}
ByteBuffer data = ByteBuffer.allocate((int) ((reference + 1 - firstReferenceToWrite) * REF_META_BYTES_COUNT));
for (long i = firstReferenceToWrite; i < reference - 1; i++) {
data.putLong(ERROR_BLOCK_ID);
data.putInt(ERRORED_CLEANER & 0xFF);
}
data.putLong(blockId);
data.putInt(cleanerId & 0xFF);
data.flip();
try {
metaFileChannel.write(data, firstReferenceToWrite * REF_META_BYTES_COUNT).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
*/
} }

View File

@ -1,10 +1,14 @@
package it.cavallium.strangedb.database.references; package it.cavallium.strangedb.database.references;
import it.cavallium.strangedb.VariableWrapper;
import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import it.unimi.dsi.fastutil.objects.ObjectIterator; import it.unimi.dsi.fastutil.objects.ObjectIterator;
import it.unimi.dsi.fastutil.objects.ObjectIterators;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
@ -12,7 +16,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID; import static it.cavallium.strangedb.database.IBlocksMetadata.ERROR_BLOCK_ID;
import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ; import static it.cavallium.strangedb.database.blocks.DatabaseBlocksMetadata.BLOCK_META_READS_AT_EVERY_READ;
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.*; import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO;
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.REF_META_READS_AT_EVERY_READ;
public class DatabaseReferencesMetadataCache { public class DatabaseReferencesMetadataCache {
@ -21,13 +26,12 @@ public class DatabaseReferencesMetadataCache {
private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ; private static final int FLUSH_CACHE_SIZE = 300 * BLOCK_META_READS_AT_EVERY_READ;
private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY; private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY;
private final Long2LongMap references2Blocks = new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f); private final Long2LongMap references2Blocks = Long2LongMaps.synchronize(new Long2LongLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f));
private final Long2ByteMap referencesCleaners = new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f); private final Long2ByteMap referencesCleaners = Long2ByteMaps.synchronize(new Long2ByteLinkedOpenHashMap(MAX_CACHE_SIZE, 0.5f));
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
private final DatabaseReferencesMetadataCacheFlusher flusher; private final DatabaseReferencesMetadataCacheFlusher flusher;
private volatile boolean closed; private volatile boolean closed;
private ExecutorService flushService = Executors.newSingleThreadExecutor(); ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "References Flush Thread"));
private volatile boolean flushing;
public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) { public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) {
this.flusher = flusher; this.flusher = flusher;
@ -67,9 +71,9 @@ public class DatabaseReferencesMetadataCache {
} }
references2Blocks.put(reference, blockId); references2Blocks.put(reference, blockId);
referencesCleaners.put(reference, cleanerId); referencesCleaners.put(reference, cleanerId);
flush();
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
flushAsync();
} }
} }
@ -86,110 +90,88 @@ public class DatabaseReferencesMetadataCache {
Long2ByteMap referencesCleanersToAdd = new Long2ByteLinkedOpenHashMap(references, cleanerIds, 0.5f); Long2ByteMap referencesCleanersToAdd = new Long2ByteLinkedOpenHashMap(references, cleanerIds, 0.5f);
references2Blocks.putAll(referencesBlocksToAdd); references2Blocks.putAll(referencesBlocksToAdd);
referencesCleaners.putAll(referencesCleanersToAdd); referencesCleaners.putAll(referencesCleanersToAdd);
flush();
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
flushAsync();
}
}
private void flushAsync() {
if (references2Blocks.size() >= FLUSH_CACHE_SIZE && !flushing) {
flushing = true;
flushService.execute(() -> {
try {
flush();
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
});
} }
} }
private void flush() throws IOException { private void flush() throws IOException {
if (closed) { if (closed) return;
flushing = false;
return;
}
int references2BlocksSize = references2Blocks.size(); int references2BlocksSize = references2Blocks.size();
if (references2BlocksSize >= FLUSH_CACHE_SIZE) { if (references2BlocksSize >= FLUSH_CACHE_SIZE) {
lock.writeLock().lock(); ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator();
try { ObjectIterator<Long2ByteMap.Entry> cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator();
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator(); @SuppressWarnings("unchecked")
ObjectIterator<Long2ByteMap.Entry> cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); ObjectArrayList<Callable<Void>> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize - GOOD_CACHE_SIZE], references2BlocksSize - GOOD_CACHE_SIZE);
List<Future<?>> entriesToFlush = new ArrayList<>(); for (int i = 0; i < references2BlocksSize - GOOD_CACHE_SIZE; i++) {
while (references2BlocksSize >= GOOD_CACHE_SIZE) { Long2LongMap.Entry entry = entriesIterator.next();
Long2LongMap.Entry entry = entriesIterator.next(); Long2ByteMap.Entry cleaner = cleanersIterator.next();
Long2ByteMap.Entry cleaner = cleanersIterator.next(); long refId = entry.getLongKey();
entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue())); byte cleanerId = cleaner.getByteValue();
entriesIterator.remove(); long blockId = entry.getLongValue();
cleanersIterator.remove(); entriesIterator.remove();
if (entriesToFlush.size() >= 1000) { cleanersIterator.remove();
awaitFlushWriteEnd(entriesToFlush);
tasks.set(i, () -> {
try {
flusher.flush(refId, cleanerId, blockId);
} catch (IOException e) {
throw new CompletionException(e);
} }
references2BlocksSize--; return null;
} });
awaitFlushWriteEnd(entriesToFlush); }
} finally { try {
flushing = false; flushExecutorService.invokeAll(tasks);
lock.writeLock().unlock(); } catch (InterruptedException e) {
throw new IOException(e.getCause());
} }
} else {
flushing = false;
} }
} }
public void close() throws IOException { public void close() throws IOException {
if (!closed) { if (!closed) {
closed = true; closed = true;
stopFlushService();
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
int references2BlocksSize = references2Blocks.size(); int references2BlocksSize = references2Blocks.size();
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator(); ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator();
ObjectIterator<Long2ByteMap.Entry> cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator(); ObjectIterator<Long2ByteMap.Entry> cleanersIterator = referencesCleaners.long2ByteEntrySet().iterator();
List<Future<?>> entriesToFlush = new LinkedList<>(); @SuppressWarnings("unchecked")
while (references2BlocksSize > 0) { ObjectArrayList<Callable<Void>> tasks = ObjectArrayList.wrap(new Callable[references2BlocksSize], references2BlocksSize);
for (int i = 0; i < references2BlocksSize; i++) {
Long2LongMap.Entry entry = entriesIterator.next(); Long2LongMap.Entry entry = entriesIterator.next();
Long2ByteMap.Entry cleaner = cleanersIterator.next(); Long2ByteMap.Entry cleaner = cleanersIterator.next();
entriesToFlush.add(flusher.flush(entry.getLongKey(), cleaner.getByteValue(), entry.getLongValue())); long refId = entry.getLongKey();
byte cleanerId = cleaner.getByteValue();
long blockId = entry.getLongValue();
entriesIterator.remove(); entriesIterator.remove();
cleanersIterator.remove(); cleanersIterator.remove();
tasks.set(i, () -> {
if (entriesToFlush.size() >= 1000) { try {
awaitFlushWriteEnd(entriesToFlush); flusher.flush(refId, cleanerId, blockId);
} } catch (IOException e) {
references2BlocksSize--; throw new CompletionException(e);
}
return null;
});
}
try {
flushExecutorService.invokeAll(tasks);
} catch (InterruptedException e) {
throw new IOException(e.getCause());
}
flushExecutorService.shutdown();
try {
if (!flushExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS))
flushExecutorService.shutdownNow();
} catch (InterruptedException e) {
throw new IOException(e);
} }
awaitFlushWriteEnd(entriesToFlush);
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
} }
} }
private void stopFlushService() {
flushService.shutdown();
try { flushService.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) {}
if (!flushService.isTerminated()) {
flushService.shutdownNow();
}
}
private void awaitFlushWriteEnd(List<Future<?>> entriesToFlush) throws IOException {
try {
entriesToFlush.parallelStream().forEach((entry) -> {
try {
entry.get();
} catch (InterruptedException e) {
throw new CompletionException(e);
} catch (ExecutionException e) {
throw new CompletionException(e.getCause());
}
});
} catch (CompletionException e) {
throw new IOException(e.getCause());
} finally {
entriesToFlush.clear();
}
}
} }

View File

@ -4,5 +4,5 @@ import java.io.IOException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
public interface DatabaseReferencesMetadataCacheFlusher { public interface DatabaseReferencesMetadataCacheFlusher {
Future<Integer> flush(long reference, byte cleanerId, long block) throws IOException; void flush(long reference, byte cleanerId, long block) throws IOException;
} }