diff --git a/pom.xml b/pom.xml index 57f9d21..21d207d 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.warp jcwdb - 1.5.0 + 1.5.1 jcwdb https://git.ignuranza.net/andreacavalli/JCWDB diff --git a/src/main/java/org/warp/cowdb/database/DatabaseBlocksIO.java b/src/main/java/org/warp/cowdb/database/DatabaseBlocksIO.java index 0abca87..e6830ce 100644 --- a/src/main/java/org/warp/cowdb/database/DatabaseBlocksIO.java +++ b/src/main/java/org/warp/cowdb/database/DatabaseBlocksIO.java @@ -21,6 +21,9 @@ public class DatabaseBlocksIO implements IBlocksIO { @Override public long newBlock(int size, ByteBuffer data) throws IOException { + if (size == 0) { + return EMPTY_BLOCK_ID; + } long index = fileIO.writeAtEnd(size, data); return blocksMetadata.newBlock(index, size); } diff --git a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java index 31e2570..d878a61 100644 --- a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java +++ b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadata.java @@ -47,12 +47,15 @@ public class DatabaseBlocksMetadata implements IBlocksMetadata { long index = buffer.getLong(); int size = buffer.getInt(); blockInfo = new BlockInfo(index, size); - cache.put(index, blockInfo); + cache.put(blockId, blockInfo); return blockInfo; } @Override public long newBlock(long index, int size) throws IOException { + if (size == 0) { + return EMPTY_BLOCK_ID; + } long newBlockId = firstFreeBlock++; BlockInfo blockInfo = new BlockInfo(index, size); cache.put(newBlockId, blockInfo); diff --git a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java index 3250cdc..257d272 100644 --- a/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java +++ b/src/main/java/org/warp/cowdb/database/DatabaseBlocksMetadataCache.java @@ -37,25 +37,16 @@ public class DatabaseBlocksMetadataCache { } } - public void put(long block, BlockInfo value) throws IOException { + public void put(long block, BlockInfo blockInfo) throws IOException { if (closed) return; synchronized (readAccessLock) { synchronized (writeAccessLock) { - blocks2Info.put(block, value); + blocks2Info.put(block, blockInfo); } } this.flush(); } - public void remove(long block) { - if (closed) return; - synchronized (readAccessLock) { - synchronized (writeAccessLock) { - blocks2Info.remove(block); - } - } - } - private void flush() throws IOException { if (closed) return; int blocks2InfoSize = blocks2Info.size(); @@ -68,17 +59,12 @@ public class DatabaseBlocksMetadataCache { BlockInfo blockInfo = entry.getValue(); entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } blocks2InfoSize--; } - try { - for (Future entryToFlush : entriesToFlush) { - entryToFlush.get(); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } + executeAsyncFlush(entriesToFlush); } } } @@ -95,19 +81,29 @@ public class DatabaseBlocksMetadataCache { Long2ObjectMap.Entry entry = entriesIterator.next(); BlockInfo blockInfo = entry.getValue(); entriesToFlush.add(flusher.flush(entry.getLongKey(), blockInfo.getIndex(), blockInfo.getSize())); + entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } blocks2InfoSize--; } - try { - for (Future entryToFlush : entriesToFlush) { - entryToFlush.get(); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } + executeAsyncFlush(entriesToFlush); } } } } + + private void executeAsyncFlush(List> entriesToFlush) throws IOException { + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + entriesToFlush.clear(); + } + } } diff --git a/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java index 39d6f7a..3004c00 100644 --- a/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java +++ b/src/main/java/org/warp/cowdb/database/DatabaseReferencesMetadataCache.java @@ -27,52 +27,42 @@ public class DatabaseReferencesMetadataCache { } public long get(long reference) throws IOException { - if (closed) throw new IOException("Cache already closed!"); synchronized (readAccessLock) { + if (closed) throw new IOException("Cache already closed!"); return references2Blocks.getOrDefault(reference, ERROR_BLOCK_ID); } } - public void put(long reference, long value) throws IOException { - if (closed) return; + public void put(long reference, long blockId) throws IOException { synchronized (readAccessLock) { synchronized (writeAccessLock) { - references2Blocks.put(reference, value); + if (closed) return; + references2Blocks.put(reference, blockId); } } this.flush(); } - public void remove(long reference) { - if (closed) return; + private void flush() throws IOException { synchronized (readAccessLock) { synchronized (writeAccessLock) { - references2Blocks.remove(reference); - } - } - } - - private void flush() throws IOException { - if (closed) return; - int references2BlocksSize = references2Blocks.size(); - if (references2BlocksSize > MAX_CACHE_SIZE) { - synchronized (writeAccessLock) { - ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); - List> entriesToFlush = new LinkedList<>(); - while (references2BlocksSize > GOOD_CACHE_SIZE) { - Long2LongMap.Entry entry = entriesIterator.next(); - entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); - entriesIterator.remove(); - references2BlocksSize--; - } - try { - for (Future entryToFlush : entriesToFlush) { - entryToFlush.get(); + if (closed) return; + int references2BlocksSize = references2Blocks.size(); + if (references2BlocksSize > MAX_CACHE_SIZE) { + synchronized (writeAccessLock) { + ObjectIterator entriesIterator = references2Blocks.long2LongEntrySet().iterator(); + List> entriesToFlush = new LinkedList<>(); + while (references2BlocksSize > GOOD_CACHE_SIZE) { + Long2LongMap.Entry entry = entriesIterator.next(); + entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); + entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } + references2BlocksSize--; + } + executeAsyncFlush(entriesToFlush); } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); } } } @@ -89,17 +79,31 @@ public class DatabaseReferencesMetadataCache { while (references2BlocksSize > 0) { Long2LongMap.Entry entry = entriesIterator.next(); entriesToFlush.add(flusher.flush(entry.getLongKey(), entry.getLongValue())); + entriesIterator.remove(); + if (entriesToFlush.size() >= 1000) { + executeAsyncFlush(entriesToFlush); + } references2BlocksSize--; } - try { - for (Future entryToFlush : entriesToFlush) { - entryToFlush.get(); - } - } catch (InterruptedException e) { - throw new IOException(e); - } catch (ExecutionException e) { - throw new IOException(e.getCause()); + executeAsyncFlush(entriesToFlush); + } + } + } + } + + private void executeAsyncFlush(List> entriesToFlush) throws IOException { + synchronized (readAccessLock) { + synchronized (writeAccessLock) { + try { + for (Future entryToFlush : entriesToFlush) { + entryToFlush.get(); } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + entriesToFlush.clear(); } } }