package org.warp.jcwdb; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.SeekableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; public class FileIndexManager implements IndexManager { private final SeekableByteChannel dataFileChannel, metadataFileChannel; private volatile long metadataFileChannelSize; private final FileAllocator fileAllocator; private final ByteBuffer metadataByteBuffer = ByteBuffer.allocateDirect(IndexDetails.TOTAL_BYTES); private final ByteBuffer maskByteBuffer = ByteBuffer.allocateDirect(Integer.BYTES); private volatile boolean closed; private final Object closeLock = new Object(); private final Object metadataByteBufferLock = new Object(); private final Object maskByteBufferLock = new Object(); private final Object indicesMapsAccessLock = new Object(); /** * Edit this using editIndex() * Get using getIndexMetadata() * This hashmap must contain all indices. */ private final Long2ObjectMap loadedIndices; /** * Edit this using editIndex() */ private final LongSet dirtyLoadedIndices, flushingAllowedIndices, removedIndices; private long firstAllocableIndex; public FileIndexManager(Path dataFile, Path metadataFile) throws IOException { if (Cleaner.DISABLE_CLEANER) { loadedIndices = new Long2ObjectOpenHashMap<>(); dirtyLoadedIndices = new LongOpenHashSet(); flushingAllowedIndices = new LongOpenHashSet(); removedIndices = new LongOpenHashSet(); } else { loadedIndices = new Long2ObjectLinkedOpenHashMap<>(); dirtyLoadedIndices = new LongLinkedOpenHashSet(); flushingAllowedIndices = new LongLinkedOpenHashSet(); removedIndices = new LongLinkedOpenHashSet(); } if (Files.notExists(dataFile)) { Files.createFile(dataFile); } if (Files.notExists(metadataFile)) { Files.createFile(metadataFile); } dataFileChannel = Files.newByteChannel(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); metadataFileChannel = Files.newByteChannel(metadataFile, StandardOpenOption.READ, StandardOpenOption.WRITE); fileAllocator = createFileAllocator(dataFileChannel, metadataFileChannel.position(0)); metadataFileChannelSize = metadataFileChannel.size(); firstAllocableIndex = getMetadataFileChannelSize() / (long) IndexDetails.TOTAL_BYTES; if (firstAllocableIndex == 0) { firstAllocableIndex = 1; } } private long getMetadataFileChannelSize() throws IOException { return metadataFileChannelSize; } private FileAllocator createFileAllocator(final SeekableByteChannel dataFileChannel, final SeekableByteChannel metadataFileChannel) throws IOException { Long2IntMap freeBytes = new Long2IntRBTreeMap(); Long2IntMap usedBytes = new Long2IntRBTreeMap(); long firstOffset = 0; while (metadataFileChannel.position() + IndexDetails.TOTAL_BYTES <= getMetadataFileChannelSize()) { IndexDetails indexDetails = readIndexDetailsAt(metadataFileChannel); if (indexDetails != null) { long offset = indexDetails.getOffset(); usedBytes.put(offset, indexDetails.getSize()); if (offset < firstOffset) { firstOffset = offset; } } } long previousEntryOffset = 0; long previousEntrySize = 0; ObjectIterator it = usedBytes.long2IntEntrySet().iterator(); while (it.hasNext()) { final Long2IntMap.Entry entry = it.next(); final long entryOffset = entry.getLongKey(); final long entrySize = entry.getIntValue(); it.remove(); if (previousEntryOffset + previousEntrySize < entryOffset) { freeBytes.put(previousEntryOffset + previousEntrySize, (int) (entryOffset - (previousEntryOffset + previousEntrySize))); } previousEntryOffset = entryOffset; previousEntrySize = entrySize; } final long fileSize = previousEntryOffset + previousEntrySize; return new FileAllocator(dataFileChannel, fileSize, freeBytes); } @Override public T get(long index, DBReader reader) throws IOException { checkClosed(); IndexDetails details = getIndexMetadata(index); Input i = new Input(Channels.newInputStream(dataFileChannel.position(details.getOffset()))); T result = reader.read(i, details.getSize()); return result; } @Override public int getType(long index) throws IOException { return getIndexMetadata(index).getType(); } @Override public long getHash(long index) throws IOException { return getIndexMetadata(index).getHash(); } @Override public IndexDetails set(long index, DBDataOutput data) throws IOException { checkClosed(); final int dataSize = data.getSize(); IndexDetails indexDetails = getIndexMetadataUnsafe(index); if (indexDetails == null || indexDetails.getSize() < dataSize) { // Allocate new space IndexDetails newDetails = allocateAndWrite(index, data); if (indexDetails != null) { // Mark free the old bytes fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize()); } return newDetails; } else { // Check if size changed if (dataSize < indexDetails.getSize()) { // Mark free the unused bytes fileAllocator.markFree(indexDetails.getOffset() + dataSize, dataSize); } // Update index details indexDetails = editIndex(index, indexDetails, indexDetails.getOffset(), dataSize, indexDetails.getType(), data.calculateHash()); // Write data writeExact(indexDetails, data); // Before returning, return IndexDetails return indexDetails; } } @Override public void setFlushingAllowed(long index, boolean isUnloadingAllowed) { checkClosed(); if (isUnloadingAllowed) { flushingAllowedIndices.add(index); } else { flushingAllowedIndices.remove(index); } } @Override public long add(DBDataOutput data) throws IOException { checkClosed(); final int size = data.getSize(); final long offset = fileAllocator.allocate(size); final int type = data.getType(); final long hash = data.calculateHash(); final IndexDetails indexDetails = new IndexDetails(offset, size, type, hash); final long index = createIndexMetadata(indexDetails); writeExact(indexDetails, data); return index; } @Override public FullIndexDetails addAndGetDetails(DBDataOutput data) throws IOException { checkClosed(); final int size = data.getSize(); final long offset = fileAllocator.allocate(size); final int type = data.getType(); final long hash = data.calculateHash(); final IndexDetails indexDetails = new IndexDetails(offset, size, type, hash); final long index = createIndexMetadata(indexDetails); writeExact(indexDetails, data); return new FullIndexDetails(index, indexDetails); } /** * Write the data at index. * The input size must be equal to the index size! * * @param indexDetails * @param data * @throws IOException */ private void writeExact(final IndexDetails indexDetails, DBDataOutput data) throws IOException { final int dataSize = data.getSize(); if (indexDetails.getSize() != dataSize) { throw new IOException("Unable to write " + dataSize + " in a space of " + indexDetails.getSize()); } final long offset = indexDetails.getOffset(); final Output o = new Output(Channels.newOutputStream(dataFileChannel.position(offset)), dataSize); data.getWriter().write(o); o.flush(); } private IndexDetails allocateAndWrite(final long index, DBDataOutput w) throws IOException { final int size = w.getSize(); final int type = w.getType(); final long hash = w.calculateHash(); final long offset = fileAllocator.allocate(size); IndexDetails details = editIndex(index, offset, size, type, hash); writeExact(details, w); return details; } @Override public void delete(long index) throws IOException { checkClosed(); IndexDetails indexDetails = getIndexMetadataUnsafe(index); if (indexDetails != null) { fileAllocator.markFree(indexDetails.getOffset(), indexDetails.getSize()); } synchronized (indicesMapsAccessLock) { dirtyLoadedIndices.remove(index); flushingAllowedIndices.remove(index); loadedIndices.remove(index); removedIndices.add(index); } } public void flushAndUnload(long index) throws IOException { if (removedIndices.contains(index)) { synchronized (indicesMapsAccessLock) { removedIndices.remove(index); dirtyLoadedIndices.remove(index); flushingAllowedIndices.remove(index); loadedIndices.remove(index); } // Update indices metadata SeekableByteChannel metadata = metadataFileChannel.position(index * IndexDetails.TOTAL_BYTES); eraseIndexDetails(metadata); } boolean isDirty = false; IndexDetails indexDetails = null; synchronized (indicesMapsAccessLock) { if (dirtyLoadedIndices.contains(index)) { indexDetails = loadedIndices.get(index); dirtyLoadedIndices.remove(index); flushingAllowedIndices.remove(index); } } if (isDirty) { // Update indices metadata long position = index * IndexDetails.TOTAL_BYTES; resizeMetadataFileChannel(position); SeekableByteChannel metadata = metadataFileChannel.position(position); writeIndexDetails(metadata, indexDetails); } synchronized (indicesMapsAccessLock) { loadedIndices.remove(index); } } @Override public boolean has(long index) { checkClosed(); try { return getIndexMetadataUnsafe(index) != null; } catch (IOException ex) { ex.printStackTrace(); return false; } } /** * Edit index data if a change is detected * @param index * @param oldData Old index data to check * @param offset offset * @param size size * @param type type * @param hash hash * @return */ private IndexDetails editIndex(long index, IndexDetails oldData, long offset, int size, int type, long hash) { if (oldData.getOffset() != offset || oldData.getSize() != size || oldData.getType() != type || oldData.getHash() != hash) { return editIndex(index, offset, size, type, hash); } else { return oldData; } } /** * Edit index data * @param index * @param offset * @param size * @param type * @param hash * @return */ private IndexDetails editIndex(long index, long offset, int size, int type, long hash) { IndexDetails indexDetails = new IndexDetails(offset, size, type, hash); editIndex(index, indexDetails); return indexDetails; } /** * Edit index data * @param index * @param details */ private void editIndex(long index, IndexDetails details) { synchronized (indicesMapsAccessLock) { loadedIndices.put(index, details); dirtyLoadedIndices.add(index); flushingAllowedIndices.remove(index); } } private long createIndexMetadata(IndexDetails indexDetails) { synchronized (indicesMapsAccessLock) { long newIndex = firstAllocableIndex++; loadedIndices.put(newIndex, indexDetails); dirtyLoadedIndices.add(newIndex); flushingAllowedIndices.remove(newIndex); removedIndices.remove(newIndex); return newIndex; } } private IndexDetails getIndexMetadataUnsafe(long index) throws IOException { // Return index details if loaded IndexDetails details; synchronized (indicesMapsAccessLock) { details = loadedIndices.getOrDefault(index, null); } if (details != null) return details; // Try to load the details from file final long metadataPosition = index * IndexDetails.TOTAL_BYTES; if (metadataPosition + IndexDetails.TOTAL_BYTES > getMetadataFileChannelSize()) { // Avoid underflow exception return null; } SeekableByteChannel currentMetadataFileChannel = metadataFileChannel.position(metadataPosition); IndexDetails indexDetails = readIndexDetailsAt(currentMetadataFileChannel); if (indexDetails != null) { editIndex(index, indexDetails); return indexDetails; } // No results found. Returning null return null; } private IndexDetails readIndexDetailsAt(SeekableByteChannel currentMetadataFileChannel) throws IOException { IndexDetails indexDetails = null; synchronized (metadataByteBufferLock) { metadataByteBuffer.rewind(); currentMetadataFileChannel.read(metadataByteBuffer); metadataByteBuffer.rewind(); // If it's not deleted continue if ((metadataByteBuffer.getInt() & IndexDetails.MASK_DELETED) == 0) { final long offset = metadataByteBuffer.getLong(); // final long sizeAndType = metadataByteBuffer.getLong(); // final int size = (int)(sizeAndType >> 32); // final int type = (int)sizeAndType; final int size = metadataByteBuffer.getInt(); final int type = metadataByteBuffer.getInt(); final long hash = metadataByteBuffer.getLong(); indexDetails = new IndexDetails(offset, size, type, hash); } } return indexDetails; } private IndexDetails getIndexMetadata(long index) throws IOException { IndexDetails details = getIndexMetadataUnsafe(index); if (details == null) throw new IOException("Index " + index + " not found"); else return details; } @Override public void close() throws IOException { if (closed) { return; } synchronized (closeLock) { if (closed) { return; } closed = true; } // Update indices metadata flushAllFlushableIndices(); // Remove removed indices removeRemovedIndices(); fileAllocator.close(); } private void writeIndexDetails(SeekableByteChannel position, IndexDetails indexDetails) throws IOException { synchronized (metadataByteBufferLock) {// FIXXXX cleaner3 final int size = indexDetails.getSize(); final int type = indexDetails.getType(); final long offset = indexDetails.getOffset(); final long hash = indexDetails.getHash(); metadataByteBuffer.rewind(); metadataByteBuffer.putInt(0); metadataByteBuffer.putLong(offset); metadataByteBuffer.putInt(size); metadataByteBuffer.putInt(type); //metadataByteBuffer.putLong((long)size << 32 | type & 0xFFFFFFFFL); metadataByteBuffer.putLong(hash); metadataByteBuffer.rewind(); position.write(metadataByteBuffer); } } private void eraseIndexDetails(SeekableByteChannel position) throws IOException { synchronized (maskByteBufferLock) { maskByteBuffer.rewind(); maskByteBuffer.putInt(IndexDetails.MASK_DELETED); maskByteBuffer.rewind(); position.write(maskByteBuffer); } } private void checkClosed() { if (closed) { throw new RuntimeException("Index Manager is closed."); } } @Override public long clean() { long cleaned = 0; long tim1 = System.currentTimeMillis(); try { cleaned += flushAllFlushableIndices(); } catch (IOException ex) { ex.printStackTrace(); } long tim2 = System.currentTimeMillis(); try { cleaned += removeRemovedIndices(); } catch (IOException ex) { ex.printStackTrace(); } long tim3 = System.currentTimeMillis(); cleaned += cleanExtraIndices(); long tim4 = System.currentTimeMillis(); if (Cleaner.ENABLE_CLEANER_LOGGING) System.out.println("[CLEANER] FileIndexManager CLEAN_TIME: " + (tim2-tim1) + "," + (tim3-tim2) + "," + (tim4-tim3)); return cleaned; } private long flushAllFlushableIndices() throws IOException { long flushedIndices = 0; SeekableByteChannel metadata = metadataFileChannel; long lastIndex = -2; synchronized (indicesMapsAccessLock) { for (long index : dirtyLoadedIndices) { if (!flushingAllowedIndices.contains(index)) { IndexDetails indexDetails = loadedIndices.get(index); long position = index * IndexDetails.TOTAL_BYTES; resizeMetadataFileChannel(position); if (index - lastIndex != 1) { metadata = metadata.position(position); } writeIndexDetails(metadata, indexDetails); lastIndex = index; flushedIndices++; } } dirtyLoadedIndices.clear(); dirtyLoadedIndices.addAll(flushingAllowedIndices); flushingAllowedIndices.clear(); } return flushedIndices; } private void resizeMetadataFileChannel(long position) { if (position + IndexDetails.TOTAL_BYTES > metadataFileChannelSize) { metadataFileChannelSize = position + IndexDetails.TOTAL_BYTES; } } private long removeRemovedIndices() throws IOException { SeekableByteChannel metadata = metadataFileChannel; synchronized (indicesMapsAccessLock) { long removed = this.removedIndices.size(); for (long index : this.removedIndices) { metadata = metadata.position(index * IndexDetails.TOTAL_BYTES); eraseIndexDetails(metadata); } this.removedIndices.clear(); return removed; } } private long cleanExtraIndices() { long removedIndices = 0; LongArrayList toUnload = new LongArrayList(); synchronized (indicesMapsAccessLock) { if (loadedIndices.size() > JCWDatabase.MAX_LOADED_INDICES) { long count = loadedIndices.size(); LongIterator it = loadedIndices.keySet().iterator(); while (it.hasNext()) { long loadedIndex = it.nextLong(); if (count < JCWDatabase.MAX_LOADED_INDICES * 3l / 2l) { break; } toUnload.add(loadedIndex); removedIndices++; count--; } } } for (long index : toUnload.elements()) { try { flushAndUnload(index); } catch (IOException e) { e.printStackTrace(); } } return removedIndices; } }