strangedb-core/src/main/java/it/cavallium/strangedb/database/references/DatabaseReferencesMetadata....

233 lines
7.5 KiB
Java

package it.cavallium.strangedb.database.references;
import it.cavallium.strangedb.database.IReferencesMetadata;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE;
public class DatabaseReferencesMetadata implements IReferencesMetadata {
public static final byte ERRORED_CLEANER = (byte) -1;
public static final byte BLANK_DATA_CLEANER = (byte) -2;
public static final ReferenceInfo NONEXISTENT_REFERENCE_INFO = new ReferenceInfo(-1, -1, ERRORED_CLEANER);
private static final int REF_META_BYTES_COUNT = Long.BYTES + Integer.BYTES + Byte.BYTES;
public static final int REF_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % REF_META_BYTES_COUNT) / REF_META_BYTES_COUNT;
private final AsynchronousFileChannel metaFileChannel;
private final DatabaseReferencesMetadataCache cache;
private AtomicLong firstFreeReference;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
public DatabaseReferencesMetadata(Path refMetaFile) throws IOException {
metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
firstFreeReference = new AtomicLong(metaFileChannel.size() / REF_META_BYTES_COUNT);
this.cache = new DatabaseReferencesMetadataCache(new Flusher());
}
@Override
public ReferenceInfo getReferenceInfo(long reference) throws IOException {
ReferenceInfo block;
long[] allReferences;
ReferenceInfo[] allInfo;
lock.readLock().lock();
try {
long firstFreeReference = this.firstFreeReference.get();
if (reference >= firstFreeReference) {
return NONEXISTENT_REFERENCE_INFO;
}
if ((block = cache.getInfo(reference)) != NONEXISTENT_REFERENCE_INFO) {
return block;
}
long position = reference * REF_META_BYTES_COUNT;
int size = REF_META_READS_AT_EVERY_READ * REF_META_BYTES_COUNT;
if (reference + (REF_META_READS_AT_EVERY_READ - 1) >= firstFreeReference) {
size = (int) ((firstFreeReference - reference) * REF_META_BYTES_COUNT);
}
int referencesCount = size / REF_META_BYTES_COUNT;
ByteBuffer buffer = ByteBuffer.allocate(size);
try {
metaFileChannel.read(buffer, position).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
buffer.flip();
if (referencesCount < 1) {
throw new IOException("Trying to read <1 references");
}
if (buffer.limit() % REF_META_BYTES_COUNT != 0 || buffer.limit() < REF_META_BYTES_COUNT) {
throw new IOException("The buffer is smaller than the data requested.");
} else if (buffer.limit() != size) {
size = buffer.limit();
referencesCount = size / REF_META_BYTES_COUNT;
}
allReferences = new long[referencesCount];
allInfo = new ReferenceInfo[referencesCount];
block = NONEXISTENT_REFERENCE_INFO;
for (int delta = 0; delta < referencesCount; delta++) {
long referenceToLoad = reference + delta;
long currentIndex = buffer.getLong();
int currentSize = buffer.getInt();
byte cleanerId = buffer.get();
ReferenceInfo refInfo = new ReferenceInfo(currentIndex, currentSize, cleanerId);
if (buffer.limit() != 0 && currentIndex != 0xFFFFFFFFFFFFFFFFL) {
allReferences[delta] = referenceToLoad;
allInfo[delta] = refInfo;
if (referenceToLoad == reference) {
block = refInfo;
}
} else {
allReferences[delta] = referenceToLoad;
allInfo[delta] = refInfo;
if (referenceToLoad == reference) {
block = NONEXISTENT_REFERENCE_INFO;
}
}
}
} finally {
lock.readLock().unlock();
}
lock.writeLock().lock();
try {
cache.putAll(allReferences, allInfo);
} finally {
lock.writeLock().unlock();
}
return block;
}
@Override
public long newReference(long index, int size) throws IOException {
long newReference;
newReference = firstFreeReference.getAndIncrement();
cache.put(newReference, new ReferenceInfo(index, size, BLANK_DATA_CLEANER), true, true);
return newReference;
}
@Override
public void editReference(long reference, ReferenceInfo info) throws IOException {
lock.writeLock().lock();
try {
cache.put(reference, info, true, false);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void deleteReference(long reference) throws IOException {
lock.writeLock().lock();
try {
cache.put(reference, NONEXISTENT_REFERENCE_INFO, true, false);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void close() throws IOException {
lock.writeLock().lock();
try {
cache.close();
metaFileChannel.close();
} finally {
lock.writeLock().unlock();
}
}
@Override
public long getFirstFreeReference() {
lock.readLock().lock();
try {
return firstFreeReference.get();
} finally {
lock.readLock().unlock();
}
}
private class Flusher implements DatabaseReferencesMetadataCacheFlusher {
private Flusher() {
}
@Override
public void flush(long reference, ReferenceInfo info, boolean closing) throws IOException {
if (info.getCleanerId() == ERRORED_CLEANER) {
throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT");
}
ByteBuffer data = ByteBuffer.allocate(REF_META_BYTES_COUNT);
data.putLong(info.getIndex());
data.putInt(info.getSize());
data.put(info.getCleanerId());
data.flip();
try {
metaFileChannel.write(data, reference * REF_META_BYTES_COUNT).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
@Override
public void flushMultiple(long referenceStart, ReferenceInfo[] infos) throws IOException {
ByteBuffer data = ByteBuffer.allocate(infos.length * REF_META_BYTES_COUNT);
for (ReferenceInfo info : infos) {
if (info.getCleanerId() == ERRORED_CLEANER) {
throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT");
}
data.putLong(info.getIndex());
data.putInt(info.getSize());
data.put(info.getCleanerId());
}
data.flip();
try {
metaFileChannel.write(data, referenceStart * REF_META_BYTES_COUNT).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
}
/*
private void writeReferenceToDisk(long reference, byte cleanerId, long blockId) throws IOException {
if (cleanerId == ERRORED_CLEANER) {
throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT");
}
long firstReferenceToWrite = 1 + lastWrittenReference.getAndUpdate((lastWrittenReferenceVal) -> reference > lastWrittenReferenceVal ? reference : lastWrittenReferenceVal);
if (firstReferenceToWrite > reference) {
firstReferenceToWrite = reference;
}
ByteBuffer data = ByteBuffer.allocate((int) ((reference + 1 - firstReferenceToWrite) * REF_META_BYTES_COUNT));
for (long i = firstReferenceToWrite; i < reference - 1; i++) {
data.putLong(ERROR_BLOCK_ID);
data.putInt(ERRORED_CLEANER & 0xFF);
}
data.putLong(blockId);
data.putInt(cleanerId & 0xFF);
data.flip();
try {
metaFileChannel.write(data, firstReferenceToWrite * REF_META_BYTES_COUNT).get();
} catch (InterruptedException e) {
throw new IOException(e);
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
*/
}