Important Bugfix. v1.5.1

Block ID cache was inverting block ids with file offsets
This commit is contained in:
Andrea Cavalli 2019-03-03 23:48:03 +01:00
parent d4599ca495
commit d8bfea0a3f
5 changed files with 76 additions and 70 deletions

View File

@ -6,7 +6,7 @@
<groupId>org.warp</groupId> <groupId>org.warp</groupId>
<artifactId>jcwdb</artifactId> <artifactId>jcwdb</artifactId>
<version>1.5.0</version> <version>1.5.1</version>
<name>jcwdb</name> <name>jcwdb</name>
<url>https://git.ignuranza.net/andreacavalli/JCWDB</url> <url>https://git.ignuranza.net/andreacavalli/JCWDB</url>

View File

@ -21,6 +21,9 @@ public class DatabaseBlocksIO implements IBlocksIO {
@Override @Override
public long newBlock(int size, ByteBuffer data) throws IOException { public long newBlock(int size, ByteBuffer data) throws IOException {
if (size == 0) {
return EMPTY_BLOCK_ID;
}
long index = fileIO.writeAtEnd(size, data); long index = fileIO.writeAtEnd(size, data);
return blocksMetadata.newBlock(index, size); return blocksMetadata.newBlock(index, size);
} }

View File

@ -47,12 +47,15 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata {
long index = buffer.getLong(); long index = buffer.getLong();
int size = buffer.getInt(); int size = buffer.getInt();
blockInfo = new BlockInfo(index, size); blockInfo = new BlockInfo(index, size);
cache.put(index, blockInfo); cache.put(blockId, blockInfo);
return blockInfo; return blockInfo;
} }
@Override @Override
public long newBlock(long index, int size) throws IOException { public long newBlock(long index, int size) throws IOException {
if (size == 0) {
return EMPTY_BLOCK_ID;
}
long newBlockId = firstFreeBlock++; long newBlockId = firstFreeBlock++;
BlockInfo blockInfo = new BlockInfo(index, size); BlockInfo blockInfo = new BlockInfo(index, size);
cache.put(newBlockId, blockInfo); cache.put(newBlockId, blockInfo);

View File

@ -37,25 +37,16 @@ public class DatabaseBlocksMetadataCache {
} }
} }
public void put(long block, BlockInfo value) throws IOException { public void put(long block, BlockInfo blockInfo) throws IOException {
if (closed) return; if (closed) return;
synchronized (readAccessLock) { synchronized (readAccessLock) {
synchronized (writeAccessLock) { synchronized (writeAccessLock) {
blocks2Info.put(block, value); blocks2Info.put(block, blockInfo);
} }
} }
this.flush(); this.flush();
} }
public void remove(long block) {
if (closed) return;
synchronized (readAccessLock) {
synchronized (writeAccessLock) {
blocks2Info.remove(block);
}
}
}
private void flush() throws IOException { private void flush() throws IOException {
if (closed) return; if (closed) return;
int blocks2InfoSize = blocks2Info.size(); int blocks2InfoSize = blocks2Info.size();
@ -68,17 +59,12 @@ public class DatabaseBlocksMetadataCache {
BlockInfo blockInfo = entry.getValue(); BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
entriesIterator.remove(); entriesIterator.remove();
if (entriesToFlush.size() >= 1000) {
executeAsyncFlush(entriesToFlush);
}
blocks2InfoSize--; blocks2InfoSize--;
} }
try { executeAsyncFlush(entriesToFlush);
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
} }
} }
} }
@ -95,19 +81,29 @@ public class DatabaseBlocksMetadataCache {
Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next(); Long2ObjectMap.Entry<BlockInfo> entry = entriesIterator.next();
BlockInfo blockInfo = entry.getValue(); BlockInfo blockInfo = entry.getValue();
entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize()));
entriesIterator.remove();
if (entriesToFlush.size() >= 1000) {
executeAsyncFlush(entriesToFlush);
}
blocks2InfoSize--; blocks2InfoSize--;
} }
try { executeAsyncFlush(entriesToFlush);
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
} }
} }
} }
} }
private void executeAsyncFlush(List<Future<?>> entriesToFlush) throws IOException {
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
}
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
entriesToFlush.clear();
}
}
} }

View File

@ -27,52 +27,42 @@ public class DatabaseReferencesMetadataCache {
} }
public long get(long reference) throws IOException { public long get(long reference) throws IOException {
if (closed) throw new IOException("Cache already closed!");
synchronized (readAccessLock) { synchronized (readAccessLock) {
if (closed) throw new IOException("Cache already closed!");
return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID);
} }
} }
public void put(long reference, long value) throws IOException { public void put(long reference, long blockId) throws IOException {
if (closed) return;
synchronized (readAccessLock) { synchronized (readAccessLock) {
synchronized (writeAccessLock) { synchronized (writeAccessLock) {
references2Blocks.put(reference, value); if (closed) return;
references2Blocks.put(reference, blockId);
} }
} }
this.flush(); this.flush();
} }
public void remove(long reference) { private void flush() throws IOException {
if (closed) return;
synchronized (readAccessLock) { synchronized (readAccessLock) {
synchronized (writeAccessLock) { synchronized (writeAccessLock) {
references2Blocks.remove(reference); if (closed) return;
} int references2BlocksSize = references2Blocks.size();
} if (references2BlocksSize > MAX_CACHE_SIZE) {
} synchronized (writeAccessLock) {
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator();
private void flush() throws IOException { List<Future<?>> entriesToFlush = new LinkedList<>();
if (closed) return; while (references2BlocksSize > GOOD_CACHE_SIZE) {
int references2BlocksSize = references2Blocks.size(); Long2LongMap.Entry entry = entriesIterator.next();
if (references2BlocksSize > MAX_CACHE_SIZE) { entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue()));
synchronized (writeAccessLock) { entriesIterator.remove();
ObjectIterator<Long2LongMap.Entry> entriesIterator = references2Blocks.long2LongEntrySet().iterator(); if (entriesToFlush.size() >= 1000) {
List<Future<?>> entriesToFlush = new LinkedList<>(); executeAsyncFlush(entriesToFlush);
while (references2BlocksSize > GOOD_CACHE_SIZE) { }
Long2LongMap.Entry entry = entriesIterator.next(); references2BlocksSize--;
entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); }
entriesIterator.remove(); executeAsyncFlush(entriesToFlush);
references2BlocksSize--;
}
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
} }
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} }
} }
} }
@ -89,17 +79,31 @@ public class DatabaseReferencesMetadataCache {
while (references2BlocksSize > 0) { while (references2BlocksSize > 0) {
Long2LongMap.Entry entry = entriesIterator.next(); Long2LongMap.Entry entry = entriesIterator.next();
entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue()));
entriesIterator.remove();
if (entriesToFlush.size() >= 1000) {
executeAsyncFlush(entriesToFlush);
}
references2BlocksSize--; references2BlocksSize--;
} }
try { executeAsyncFlush(entriesToFlush);
for (Future<?> entryToFlush : entriesToFlush) { }
entryToFlush.get(); }
} }
} catch (InterruptedException e) { }
throw new IOException(e);
} catch (ExecutionException e) { private void executeAsyncFlush(List<Future<?>> entriesToFlush) throws IOException {
throw new IOException(e.getCause()); synchronized (readAccessLock) {
synchronized (writeAccessLock) {
try {
for (Future<?> entryToFlush : entriesToFlush) {
entryToFlush.get();
} }
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
entriesToFlush.clear();
} }
} }
} }