Multidatabase
This commit is contained in:
parent
f030a96608
commit
7a4dd5757c
@ -1,7 +1,6 @@
|
||||
package it.cavallium.strangedb.database;
|
||||
|
||||
import it.cavallium.strangedb.database.references.DatabaseReferencesIO;
|
||||
import it.cavallium.strangedb.database.references.DatabaseReferencesMetadata;
|
||||
import it.cavallium.strangedb.database.references.ConcurrentDatabaseReferencesIO;
|
||||
import it.cavallium.strangedb.database.references.ReferenceInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -10,29 +9,19 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
|
||||
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO;
|
||||
import static it.cavallium.strangedb.database.references.ConcurrentDatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO;
|
||||
|
||||
public class DatabaseCore implements IDatabase {
|
||||
|
||||
private final DatabaseFileIO fileIO;
|
||||
protected final DatabaseReferencesIO referencesIO;
|
||||
protected final DatabaseReferencesMetadata referencesMetadata;
|
||||
protected final ConcurrentDatabaseReferencesIO referencesIO;
|
||||
protected final Path dataFile;
|
||||
protected final Path referencesMetaFile;
|
||||
protected volatile boolean closed;
|
||||
|
||||
public DatabaseCore(Path dataFile, Path referencesMetaFile) throws IOException {
|
||||
if (Files.notExists(dataFile)) {
|
||||
Files.createFile(dataFile);
|
||||
}
|
||||
if (Files.notExists(referencesMetaFile)) {
|
||||
Files.createFile(referencesMetaFile);
|
||||
}
|
||||
this.dataFile = dataFile;
|
||||
this.referencesMetaFile = referencesMetaFile;
|
||||
this.fileIO = new DatabaseFileIO(dataFile);
|
||||
this.referencesMetadata = new DatabaseReferencesMetadata(referencesMetaFile);
|
||||
this.referencesIO = new DatabaseReferencesIO(fileIO, referencesMetadata);
|
||||
this.referencesIO = new ConcurrentDatabaseReferencesIO(dataFile, referencesMetaFile, 10, 10);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -40,8 +29,7 @@ public class DatabaseCore implements IDatabase {
|
||||
if (this.closed) {
|
||||
throw new IOException("The database has been already closed!");
|
||||
}
|
||||
this.referencesMetadata.close();
|
||||
this.fileIO.close();
|
||||
this.referencesIO.close();
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
@ -66,12 +54,12 @@ public class DatabaseCore implements IDatabase {
|
||||
DatabaseCore databaseToClean = new DatabaseCore(newDataFile, newReferencesFile);
|
||||
DatabaseCore newDatabase = new DatabaseCore(dataFile, referencesMetaFile);
|
||||
|
||||
long referencesCount = databaseToClean.referencesMetadata.getFirstFreeReference();
|
||||
long referencesCount = databaseToClean.referencesIO.getFirstFreeReference();
|
||||
long writtenReferences = 0;
|
||||
|
||||
for (int referenceID = 0; referenceID < referencesCount; referenceID++) {
|
||||
try {
|
||||
ReferenceInfo ref = databaseToClean.referencesMetadata.getReferenceInfo(referenceID);
|
||||
ReferenceInfo ref = databaseToClean.referencesIO.getReferenceInfo(referenceID);
|
||||
if (!NONEXISTENT_REFERENCE_INFO.equals(ref)) {
|
||||
ByteBuffer buffer = databaseToClean.referencesIO.readFromReference(referenceID);
|
||||
newDatabase.referencesIO.writeToReference(referenceID, ref.getCleanerId(), buffer.limit(), buffer);
|
||||
|
@ -1,94 +0,0 @@
|
||||
package it.cavallium.strangedb.database;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousFileChannel;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class DatabaseFileIO implements IFileIO {
|
||||
|
||||
private final AsynchronousFileChannel dataFileChannel;
|
||||
private final AtomicLong firstFreeIndex;
|
||||
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
|
||||
private volatile boolean closed = false;
|
||||
|
||||
public DatabaseFileIO(Path dataFile) throws IOException {
|
||||
dataFileChannel = AsynchronousFileChannel.open(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
firstFreeIndex = new AtomicLong(dataFileChannel.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer readAt(long index, int length) throws IOException {
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
if (closed) throw new IOException("Database closed!");
|
||||
ByteBuffer dataBuffer = ByteBuffer.allocate(length);
|
||||
try {
|
||||
dataFileChannel.read(dataBuffer, index).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
dataBuffer.flip();
|
||||
return dataBuffer;
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writeAt(long index, int length, ByteBuffer data) throws IOException {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
return writeAt_(index, length, data);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private int writeAt_(long index, int length, ByteBuffer data) throws IOException {
|
||||
if (closed) throw new IOException("Database closed!");
|
||||
if (data.position() != 0) {
|
||||
throw new IOException("You didn't flip the ByteBuffer!");
|
||||
}
|
||||
firstFreeIndex.updateAndGet((firstFreeIndex) -> firstFreeIndex < index + length ? index + length : firstFreeIndex);
|
||||
try {
|
||||
return dataFileChannel.write(data, index).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long writeAtEnd(int length, ByteBuffer data) throws IOException {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
if (closed) throw new IOException("Database closed!");
|
||||
long index = firstFreeIndex.getAndAdd(length);
|
||||
writeAt_(index, length, data);
|
||||
return index;
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
if (closed) throw new IOException("Database already closed!");
|
||||
closed = true;
|
||||
dataFileChannel.close();
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
@ -3,14 +3,14 @@ package it.cavallium.strangedb.database;
|
||||
import java.io.*;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public interface IFileIO {
|
||||
public interface IIO {
|
||||
/**
|
||||
* Read *length* bytes in position *index*
|
||||
* @param index index
|
||||
* @param length length
|
||||
* @return bytes
|
||||
*/
|
||||
ByteBuffer readAt(long index, int length) throws IOException;
|
||||
ByteBuffer readAt(int hash, long index, int length) throws IOException;
|
||||
|
||||
/**
|
||||
* Write *length* bytes in position *index*
|
||||
@ -18,7 +18,7 @@ public interface IFileIO {
|
||||
* @param length length
|
||||
* @param data bytes
|
||||
*/
|
||||
int writeAt(long index, int length, ByteBuffer data) throws IOException;
|
||||
int writeAt(int hash, long index, int length, ByteBuffer data) throws IOException;
|
||||
|
||||
/**
|
||||
* Write *length* bytes in position *index*
|
||||
@ -26,7 +26,7 @@ public interface IFileIO {
|
||||
* @param data bytes
|
||||
* @return index
|
||||
*/
|
||||
long writeAtEnd(int length, ByteBuffer data) throws IOException;
|
||||
long writeAtEnd(int hash, int length, ByteBuffer data) throws IOException;
|
||||
|
||||
/**
|
||||
* Close the file
|
@ -33,4 +33,9 @@ public interface IReferencesIO {
|
||||
* @return bytes
|
||||
*/
|
||||
ByteBuffer readFromReference(long reference) throws IOException;
|
||||
|
||||
/**
|
||||
* Closes the references
|
||||
*/
|
||||
void close() throws IOException;
|
||||
}
|
||||
|
@ -15,13 +15,21 @@ public interface IReferencesMetadata {
|
||||
ReferenceInfo getReferenceInfo(long reference) throws IOException;
|
||||
|
||||
/**
|
||||
* Allocate a new reference
|
||||
* Allocate a new reference number
|
||||
*
|
||||
* @param index index
|
||||
* @param size size
|
||||
* @return reference
|
||||
*/
|
||||
long newReference(long index, int size) throws IOException;
|
||||
long allocateReference() throws IOException;
|
||||
|
||||
/**
|
||||
* Writes the metadata of a newly allocated reference
|
||||
*
|
||||
* @param newlyAllocatedReference newly allocated reference
|
||||
* @param index index
|
||||
* @param size size
|
||||
* @return newlyAllocatedReference
|
||||
*/
|
||||
long newReference(long newlyAllocatedReference, long index, int size) throws IOException;
|
||||
|
||||
/**
|
||||
* Change reference
|
||||
|
@ -0,0 +1,88 @@
|
||||
package it.cavallium.strangedb.database.io;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class ConcurrentMultiFileIO implements IMultiFileIO {
|
||||
|
||||
public final int filesCount;
|
||||
private final SimpleFileReaderWriter[] fileRW;
|
||||
private ReentrantReadWriteLock[] locks;
|
||||
|
||||
public ConcurrentMultiFileIO(Path dataFile, int filesCount) throws IOException {
|
||||
this.filesCount = filesCount;
|
||||
String fileName = dataFile.toFile().getName();
|
||||
|
||||
fileRW = new SimpleFileReaderWriter[filesCount];
|
||||
locks = new ReentrantReadWriteLock[filesCount];
|
||||
|
||||
for (int i = 0; i < filesCount; i++) {
|
||||
locks[i] = new ReentrantReadWriteLock(false);
|
||||
fileRW[i] = new SimpleFileReaderWriter(dataFile.resolveSibling(ExtensionUtils.insertExtension(fileName, i)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFilesCount() {
|
||||
return filesCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer readAt(int fileIndex, long index, int length) throws IOException {
|
||||
locks[fileIndex].readLock().lock();
|
||||
try {
|
||||
return fileRW[fileIndex].readAt(index, length);
|
||||
} finally {
|
||||
locks[fileIndex].readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writeAt(int fileIndex, long index, int length, ByteBuffer data) throws IOException {
|
||||
locks[fileIndex].writeLock().lock();
|
||||
try {
|
||||
return fileRW[fileIndex].writeAt(index, length, data);
|
||||
} finally {
|
||||
locks[fileIndex].writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long writeAtEnd(int fileIndex, int length, ByteBuffer data) throws IOException {
|
||||
locks[fileIndex].writeLock().lock();
|
||||
try {
|
||||
return fileRW[fileIndex].writeAtEnd(length, data);
|
||||
} finally {
|
||||
locks[fileIndex].writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
for (ReentrantReadWriteLock lock : locks) {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
try {
|
||||
for (SimpleFileReaderWriter file : fileRW) {
|
||||
file.close();
|
||||
}
|
||||
} finally {
|
||||
for (ReentrantReadWriteLock lock : locks) {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public long getTotalSize() {
|
||||
long totalSize = 0;
|
||||
for (int i = 0; i < fileRW.length; i++) {
|
||||
locks[i].readLock().lock();
|
||||
totalSize += fileRW[i].getSize();
|
||||
locks[i].readLock().unlock();
|
||||
}
|
||||
return totalSize;
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
package it.cavallium.strangedb.database.io;
|
||||
|
||||
|
||||
import it.cavallium.strangedb.database.IIO;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class DatabaseIO implements IIO {
|
||||
|
||||
private final IMultiFileIO file;
|
||||
|
||||
public DatabaseIO(IMultiFileIO file) throws IOException {
|
||||
this.file = file;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer readAt(int hash, long index, int length) throws IOException {
|
||||
int fileIndex = hash % file.getFilesCount();
|
||||
return file.readAt(fileIndex, index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writeAt(int hash, long index, int length, ByteBuffer data) throws IOException {
|
||||
int fileIndex = hash % file.getFilesCount();
|
||||
return file.writeAt(fileIndex, index, length, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long writeAtEnd(int hash, int length, ByteBuffer data) throws IOException {
|
||||
int fileIndex = hash % file.getFilesCount();
|
||||
return file.writeAtEnd(fileIndex, length, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
file.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,11 @@
|
||||
package it.cavallium.strangedb.database.io;
|
||||
|
||||
public class ExtensionUtils {
|
||||
public static String insertExtension(String fileName, String extensionToInsert) {
|
||||
return fileName.replaceFirst("\\.", "." + extensionToInsert + ".");
|
||||
}
|
||||
|
||||
public static String insertExtension(String fileName, int extensionToInsert) {
|
||||
return fileName.replaceFirst("\\.", "." + extensionToInsert + ".");
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package it.cavallium.strangedb.database.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public interface IMultiFileIO {
|
||||
/**
|
||||
* @return the files count
|
||||
*/
|
||||
int getFilesCount();
|
||||
|
||||
/**
|
||||
* Read *length* bytes in position *index*
|
||||
* @param index index
|
||||
* @param length length
|
||||
* @return bytes
|
||||
*/
|
||||
ByteBuffer readAt(int fileIndex, long index, int length) throws IOException;
|
||||
|
||||
/**
|
||||
* Write *length* bytes in position *index*
|
||||
* @param index index
|
||||
* @param length length
|
||||
* @param data bytes
|
||||
*/
|
||||
int writeAt(int fileIndex, long index, int length, ByteBuffer data) throws IOException;
|
||||
|
||||
/**
|
||||
* Write *length* bytes in position *index*
|
||||
* @param length length
|
||||
* @param data bytes
|
||||
* @return index
|
||||
*/
|
||||
long writeAtEnd(int fileIndex, int length, ByteBuffer data) throws IOException;
|
||||
|
||||
/**
|
||||
* Close the file
|
||||
*/
|
||||
void close() throws IOException;
|
||||
}
|
@ -0,0 +1,79 @@
|
||||
package it.cavallium.strangedb.database.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousFileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class SimpleFileReaderWriter {
|
||||
|
||||
private final AsynchronousFileChannel dataFileChannel;
|
||||
private final AtomicLong firstFreeIndex;
|
||||
private volatile boolean closed = false;
|
||||
|
||||
public SimpleFileReaderWriter(Path dataFile) throws IOException {
|
||||
String fileName = dataFile.toFile().getName();
|
||||
|
||||
dataFile = dataFile.resolveSibling(fileName);
|
||||
if (Files.notExists(dataFile)) {
|
||||
Files.createFile(dataFile);
|
||||
}
|
||||
|
||||
dataFileChannel = AsynchronousFileChannel.open(dataFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
firstFreeIndex = new AtomicLong(dataFileChannel.size());
|
||||
}
|
||||
|
||||
public ByteBuffer readAt(long index, int length) throws IOException {
|
||||
if (closed) throw new IOException("Database closed!");
|
||||
ByteBuffer dataBuffer = ByteBuffer.allocate(length);
|
||||
try {
|
||||
dataFileChannel.read(dataBuffer, index).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
dataBuffer.flip();
|
||||
return dataBuffer;
|
||||
}
|
||||
|
||||
public int writeAt(long index, int length, ByteBuffer data) throws IOException {
|
||||
if (closed) throw new IOException("Database closed!");
|
||||
if (data.position() != 0) {
|
||||
throw new IOException("You didn't flip the ByteBuffer!");
|
||||
}
|
||||
firstFreeIndex.updateAndGet((firstFreeIndex) -> firstFreeIndex < index + length ? index + length : firstFreeIndex);
|
||||
try {
|
||||
return dataFileChannel.write(data, index).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
public long writeAtEnd(int length, ByteBuffer data) throws IOException {
|
||||
if (closed) throw new IOException("Database closed!");
|
||||
long index = firstFreeIndex.getAndAdd(length);
|
||||
writeAt(index, length, data);
|
||||
return index;
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
if (closed) throw new IOException("Database already closed!");
|
||||
closed = true;
|
||||
dataFileChannel.close();
|
||||
}
|
||||
|
||||
public boolean isClosed() {
|
||||
return closed;
|
||||
}
|
||||
|
||||
public long getSize() {
|
||||
return firstFreeIndex.get();
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
package it.cavallium.strangedb.database.io;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class SimpleMultiFileIO implements IMultiFileIO {
|
||||
|
||||
public final int filesCount;
|
||||
private final SimpleFileReaderWriter[] fileRW;
|
||||
|
||||
public SimpleMultiFileIO(Path dataFile, int filesCount) throws IOException {
|
||||
this.filesCount = filesCount;
|
||||
String fileName = dataFile.toFile().getName();
|
||||
|
||||
fileRW = new SimpleFileReaderWriter[filesCount];
|
||||
|
||||
for (int i = 0; i < filesCount; i++) {
|
||||
fileRW[i] = new SimpleFileReaderWriter(dataFile.resolveSibling(ExtensionUtils.insertExtension(fileName, i)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getFilesCount() {
|
||||
return filesCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer readAt(int fileIndex, long index, int length) throws IOException {
|
||||
return fileRW[fileIndex].readAt(index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int writeAt(int fileIndex, long index, int length, ByteBuffer data) throws IOException {
|
||||
return fileRW[fileIndex].writeAt(index, length, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long writeAtEnd(int fileIndex, int length, ByteBuffer data) throws IOException {
|
||||
return fileRW[fileIndex].writeAtEnd(length, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
for (SimpleFileReaderWriter file : fileRW) {
|
||||
file.close();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,85 @@
|
||||
package it.cavallium.strangedb.database.references;
|
||||
|
||||
import it.cavallium.strangedb.database.io.ConcurrentMultiFileIO;
|
||||
import it.cavallium.strangedb.database.io.DatabaseIO;
|
||||
import it.cavallium.strangedb.database.IReferencesIO;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class ConcurrentDatabaseReferencesIO implements IReferencesIO {
|
||||
|
||||
private final DatabaseIO fileIO;
|
||||
private final ConcurrentDatabaseReferencesMetadata referencesMetadata;
|
||||
|
||||
public ConcurrentDatabaseReferencesIO(Path dataFile, Path referencesMetaFile, int dataFilesCount, int referencesFilesCount) throws IOException {
|
||||
this.fileIO = new DatabaseIO(new ConcurrentMultiFileIO(dataFile, dataFilesCount));
|
||||
this.referencesMetadata = new ConcurrentDatabaseReferencesMetadata(referencesMetaFile, referencesFilesCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long allocateReference() throws IOException {
|
||||
return referencesMetadata.newReference(referencesMetadata.allocateReference(), 0, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long allocateReference(int size, ByteBuffer data) throws IOException {
|
||||
long reference = referencesMetadata.allocateReference();
|
||||
long index = writeToFile(reference, size, data);
|
||||
return referencesMetadata.newReference(reference, index, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeToReference(long reference, byte cleanerId, int size, ByteBuffer data) throws IOException {
|
||||
long index = writeToFile(reference, size, data);
|
||||
referencesMetadata.editReference(reference, new ReferenceInfo(index, size, cleanerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer readFromReference(long reference) throws IOException {
|
||||
ReferenceInfo referenceInfo = referencesMetadata.getReferenceInfo(reference);
|
||||
return fileIO.readAt((int) reference, referenceInfo.getIndex(), referenceInfo.getSize());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
this.referencesMetadata.close();
|
||||
this.fileIO.close();
|
||||
}
|
||||
|
||||
public ByteBuffer readFromReferenceSizeAndLastElementOfReferencesList(long reference) throws IOException {
|
||||
ReferenceInfo referenceInfo = referencesMetadata.getReferenceInfo(reference);
|
||||
if (referenceInfo.getSize() >= Integer.BYTES * 2 + Long.BYTES) {
|
||||
return fileIO.readAt((int) reference, referenceInfo.getIndex() + referenceInfo.getSize() - (Integer.BYTES + Long.BYTES), Integer.BYTES + Long.BYTES);
|
||||
} else {
|
||||
return fileIO.readAt((int) reference, referenceInfo.getIndex(), referenceInfo.getSize());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param size
|
||||
* @param data
|
||||
* @return index
|
||||
* @throws IOException
|
||||
*/
|
||||
private long writeToFile(long hash, int size, ByteBuffer data) throws IOException {
|
||||
if (size == 0 && data == null) return 0;
|
||||
if (size < 0) {
|
||||
throw new IOException("Trying to create a block with size " + size);
|
||||
}
|
||||
if (data.limit() < size) {
|
||||
throw new IOException("Trying to create a block with size " + size + " but with a buffer of size " + data.limit());
|
||||
}
|
||||
return fileIO.writeAtEnd((int) hash, size, data);
|
||||
}
|
||||
|
||||
public ReferenceInfo getReferenceInfo(long reference) throws IOException {
|
||||
return referencesMetadata.getReferenceInfo(reference);
|
||||
}
|
||||
|
||||
public long getFirstFreeReference() {
|
||||
return referencesMetadata.getFirstFreeReference();
|
||||
}
|
||||
}
|
@ -1,34 +1,39 @@
|
||||
package it.cavallium.strangedb.database.references;
|
||||
|
||||
import it.cavallium.strangedb.database.IReferencesMetadata;
|
||||
import it.cavallium.strangedb.database.io.ConcurrentMultiFileIO;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.AsynchronousFileChannel;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static it.cavallium.strangedb.database.IDatabase.DISK_BLOCK_SIZE;
|
||||
|
||||
public class DatabaseReferencesMetadata implements IReferencesMetadata {
|
||||
public class ConcurrentDatabaseReferencesMetadata implements IReferencesMetadata {
|
||||
public static final byte ERRORED_CLEANER = (byte) -1;
|
||||
public static final byte BLANK_DATA_CLEANER = (byte) -2;
|
||||
public static final ReferenceInfo NONEXISTENT_REFERENCE_INFO = new ReferenceInfo(-1, -1, ERRORED_CLEANER);
|
||||
private static final int REF_META_BYTES_COUNT = Long.BYTES + Integer.BYTES + Byte.BYTES;
|
||||
public static final int REF_META_READS_AT_EVERY_READ = (DISK_BLOCK_SIZE - DISK_BLOCK_SIZE % REF_META_BYTES_COUNT) / REF_META_BYTES_COUNT;
|
||||
private static final int SEQUENCE_SIZE = 6;
|
||||
private static final long SEQUENCE_SIZE_L = SEQUENCE_SIZE;
|
||||
|
||||
private final AsynchronousFileChannel metaFileChannel;
|
||||
private final int filesCount;
|
||||
private final long filesCountL;
|
||||
private final ConcurrentMultiFileIO metaFile;
|
||||
private final DatabaseReferencesMetadataCache cache;
|
||||
private AtomicLong firstFreeReference;
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
|
||||
|
||||
public DatabaseReferencesMetadata(Path refMetaFile) throws IOException {
|
||||
metaFileChannel = AsynchronousFileChannel.open(refMetaFile, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
firstFreeReference = new AtomicLong(metaFileChannel.size() / REF_META_BYTES_COUNT);
|
||||
this.cache = new DatabaseReferencesMetadataCache(new Flusher());
|
||||
public ConcurrentDatabaseReferencesMetadata(Path refMetaFile, int filesCount) throws IOException {
|
||||
this.filesCount = filesCount;
|
||||
this.filesCountL = filesCount;
|
||||
metaFile = new ConcurrentMultiFileIO(refMetaFile, this.filesCount);
|
||||
firstFreeReference = new AtomicLong(metaFile.getTotalSize() / REF_META_BYTES_COUNT);
|
||||
this.cache = new DatabaseReferencesMetadataCache(filesCount, new Flusher());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -45,26 +50,20 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
|
||||
if ((block = cache.getInfo(reference)) != NONEXISTENT_REFERENCE_INFO) {
|
||||
return block;
|
||||
}
|
||||
long position = reference * REF_META_BYTES_COUNT;
|
||||
int size = REF_META_READS_AT_EVERY_READ * REF_META_BYTES_COUNT;
|
||||
if (reference + (REF_META_READS_AT_EVERY_READ - 1) >= firstFreeReference) {
|
||||
size = (int) ((firstFreeReference - reference) * REF_META_BYTES_COUNT);
|
||||
size = (int) ((firstFreeReference - reference)) * REF_META_BYTES_COUNT;
|
||||
}
|
||||
int referencesCount = size / REF_META_BYTES_COUNT;
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocate(size);
|
||||
try {
|
||||
metaFileChannel.read(buffer, position).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
buffer.flip();
|
||||
|
||||
if (referencesCount < 1) {
|
||||
throw new IOException("Trying to read <1 references");
|
||||
}
|
||||
|
||||
int fileId = toFileId(reference);
|
||||
long localReference = toLocalReference(fileId, reference);
|
||||
|
||||
ByteBuffer buffer = metaFile.readAt(fileId, localReference * REF_META_BYTES_COUNT, size);
|
||||
|
||||
if (buffer.limit() % REF_META_BYTES_COUNT != 0 || buffer.limit() < REF_META_BYTES_COUNT) {
|
||||
throw new IOException("The buffer is smaller than the data requested.");
|
||||
} else if (buffer.limit() != size) {
|
||||
@ -77,7 +76,7 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
|
||||
|
||||
block = NONEXISTENT_REFERENCE_INFO;
|
||||
for (int delta = 0; delta < referencesCount; delta++) {
|
||||
long referenceToLoad = reference + delta;
|
||||
long referenceToLoad = toReference(fileId, localReference + delta);
|
||||
long currentIndex = buffer.getLong();
|
||||
int currentSize = buffer.getInt();
|
||||
byte cleanerId = buffer.get();
|
||||
@ -109,11 +108,14 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long newReference(long index, int size) throws IOException {
|
||||
long newReference;
|
||||
newReference = firstFreeReference.getAndIncrement();
|
||||
cache.put(newReference, new ReferenceInfo(index, size, BLANK_DATA_CLEANER), true, true);
|
||||
return newReference;
|
||||
public long allocateReference() throws IOException {
|
||||
return firstFreeReference.getAndIncrement();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long newReference(long newlyAllocatedReference, long index, int size) throws IOException {
|
||||
cache.put(newlyAllocatedReference, new ReferenceInfo(index, size, BLANK_DATA_CLEANER), true, true);
|
||||
return newlyAllocatedReference;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -141,7 +143,7 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
cache.close();
|
||||
metaFileChannel.close();
|
||||
metaFile.close();
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
@ -172,61 +174,65 @@ public class DatabaseReferencesMetadata implements IReferencesMetadata {
|
||||
data.putInt(info.getSize());
|
||||
data.put(info.getCleanerId());
|
||||
data.flip();
|
||||
try {
|
||||
metaFileChannel.write(data, reference * REF_META_BYTES_COUNT).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
|
||||
int fileIndex = toFileId(reference);
|
||||
long localReference = toLocalReference(fileIndex, reference);
|
||||
metaFile.writeAt(fileIndex, localReference * REF_META_BYTES_COUNT, REF_META_BYTES_COUNT, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushMultiple(long referenceStart, ReferenceInfo[] infos) throws IOException {
|
||||
ByteBuffer data = ByteBuffer.allocate(infos.length * REF_META_BYTES_COUNT);
|
||||
for (ReferenceInfo info : infos) {
|
||||
long[] startLocalReferences = new long[filesCount];
|
||||
int[] referencesCount = new int[filesCount];
|
||||
Arrays.fill(startLocalReferences, -1);
|
||||
ByteBuffer[] buffers = new ByteBuffer[filesCount];
|
||||
int maxBufferSize = (infos.length > SEQUENCE_SIZE ? (infos.length / filesCount) + SEQUENCE_SIZE : SEQUENCE_SIZE) * REF_META_BYTES_COUNT;
|
||||
for (int i = 0; i < buffers.length; i++) {
|
||||
buffers[i] = ByteBuffer.allocate(maxBufferSize);
|
||||
}
|
||||
|
||||
for (int delta = 0; delta < infos.length; delta++) {
|
||||
ReferenceInfo info = infos[delta];
|
||||
long reference = referenceStart + delta;
|
||||
int fileId = toFileId(reference);
|
||||
long localReference = toLocalReference(fileId, reference);
|
||||
if (startLocalReferences[fileId] == -1) {
|
||||
startLocalReferences[fileId] = localReference;
|
||||
}
|
||||
referencesCount[fileId]++;
|
||||
if (info.getCleanerId() == ERRORED_CLEANER) {
|
||||
throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT");
|
||||
}
|
||||
data.putLong(info.getIndex());
|
||||
data.putInt(info.getSize());
|
||||
data.put(info.getCleanerId());
|
||||
if (buffers[fileId].remaining() < REF_META_BYTES_COUNT) {
|
||||
throw new IOException();
|
||||
}
|
||||
buffers[fileId].putLong(info.getIndex());
|
||||
buffers[fileId].putInt(info.getSize());
|
||||
buffers[fileId].put(info.getCleanerId());
|
||||
}
|
||||
data.flip();
|
||||
try {
|
||||
metaFileChannel.write(data, referenceStart * REF_META_BYTES_COUNT).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
for (int fileId = 0; fileId < filesCount; fileId++) {
|
||||
if (referencesCount[fileId] > 0) {
|
||||
buffers[fileId].flip();
|
||||
|
||||
long localPosition = startLocalReferences[fileId] * REF_META_BYTES_COUNT;
|
||||
|
||||
metaFile.writeAt(fileId, localPosition, referencesCount[fileId] * REF_META_BYTES_COUNT, buffers[fileId]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
private void writeReferenceToDisk(long reference, byte cleanerId, long blockId) throws IOException {
|
||||
if (cleanerId == ERRORED_CLEANER) {
|
||||
throw new IOException("Passing a cleaner with the id of ERRORED_CLIENT");
|
||||
}
|
||||
long firstReferenceToWrite = 1 + lastWrittenReference.getAndUpdate((lastWrittenReferenceVal) -> reference > lastWrittenReferenceVal ? reference : lastWrittenReferenceVal);
|
||||
if (firstReferenceToWrite > reference) {
|
||||
firstReferenceToWrite = reference;
|
||||
}
|
||||
ByteBuffer data = ByteBuffer.allocate((int) ((reference + 1 - firstReferenceToWrite) * REF_META_BYTES_COUNT));
|
||||
for (long i = firstReferenceToWrite; i < reference - 1; i++) {
|
||||
data.putLong(ERROR_BLOCK_ID);
|
||||
data.putInt(ERRORED_CLEANER & 0xFF);
|
||||
}
|
||||
data.putLong(blockId);
|
||||
data.putInt(cleanerId & 0xFF);
|
||||
data.flip();
|
||||
try {
|
||||
metaFileChannel.write(data, firstReferenceToWrite * REF_META_BYTES_COUNT).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} catch (ExecutionException e) {
|
||||
throw new IOException(e.getCause());
|
||||
}
|
||||
|
||||
private long toReference(int fileId, long localReference) {
|
||||
return (fileId + filesCountL * (localReference / SEQUENCE_SIZE_L)) * SEQUENCE_SIZE_L + (localReference % SEQUENCE_SIZE_L);
|
||||
}
|
||||
|
||||
private int toFileId(long reference) {
|
||||
return (int) ((reference / SEQUENCE_SIZE_L) % filesCountL);
|
||||
}
|
||||
|
||||
private long toLocalReference(int fileId, long reference) {
|
||||
long chunkId = (reference - (reference % SEQUENCE_SIZE_L)) / SEQUENCE_SIZE_L;
|
||||
return (SEQUENCE_SIZE_L * (chunkId - fileId)) / filesCountL + reference % SEQUENCE_SIZE_L;
|
||||
}
|
||||
*/
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
package it.cavallium.strangedb.database.references;
|
||||
|
||||
import it.cavallium.strangedb.database.DatabaseFileIO;
|
||||
import it.cavallium.strangedb.database.IReferencesIO;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
public class DatabaseReferencesIO implements IReferencesIO {
|
||||
|
||||
private final DatabaseFileIO fileIO;
|
||||
private final DatabaseReferencesMetadata referencesMetadata;
|
||||
|
||||
public DatabaseReferencesIO(DatabaseFileIO fileIO, DatabaseReferencesMetadata referencesMetadata) {
|
||||
this.fileIO = fileIO;
|
||||
this.referencesMetadata = referencesMetadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long allocateReference() throws IOException {
|
||||
return referencesMetadata.newReference(0, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long allocateReference(int size, ByteBuffer data) throws IOException {
|
||||
long index = writeToFile(size, data);
|
||||
return referencesMetadata.newReference(index, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeToReference(long reference, byte cleanerId, int size, ByteBuffer data) throws IOException {
|
||||
long index = writeToFile(size, data);
|
||||
referencesMetadata.editReference(reference, new ReferenceInfo(index, size, cleanerId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer readFromReference(long reference) throws IOException {
|
||||
ReferenceInfo referenceInfo = referencesMetadata.getReferenceInfo(reference);
|
||||
return fileIO.readAt(referenceInfo.getIndex(), referenceInfo.getSize());
|
||||
}
|
||||
|
||||
public ByteBuffer readFromReferenceSizeAndLastElementOfReferencesList(long reference) throws IOException {
|
||||
ReferenceInfo referenceInfo = referencesMetadata.getReferenceInfo(reference);
|
||||
if (referenceInfo.getSize() >= Integer.BYTES * 2 + Long.BYTES) {
|
||||
return fileIO.readAt(referenceInfo.getIndex() + referenceInfo.getSize() - (Integer.BYTES + Long.BYTES), Integer.BYTES + Long.BYTES);
|
||||
} else {
|
||||
return fileIO.readAt(referenceInfo.getIndex(), referenceInfo.getSize());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param size
|
||||
* @param data
|
||||
* @return index
|
||||
* @throws IOException
|
||||
*/
|
||||
private long writeToFile(int size, ByteBuffer data) throws IOException {
|
||||
if (size == 0 && data == null) return 0;
|
||||
if (size < 0) {
|
||||
throw new IOException("Trying to create a block with size " + size);
|
||||
}
|
||||
if (data.limit() < size) {
|
||||
throw new IOException("Trying to create a block with size " + size + " but with a buffer of size " + data.limit());
|
||||
}
|
||||
return fileIO.writeAtEnd(size, data);
|
||||
}
|
||||
}
|
@ -5,32 +5,40 @@ import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectIterator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO;
|
||||
import static it.cavallium.strangedb.database.references.DatabaseReferencesMetadata.REF_META_READS_AT_EVERY_READ;
|
||||
import static it.cavallium.strangedb.database.references.ConcurrentDatabaseReferencesMetadata.NONEXISTENT_REFERENCE_INFO;
|
||||
import static it.cavallium.strangedb.database.references.ConcurrentDatabaseReferencesMetadata.REF_META_READS_AT_EVERY_READ;
|
||||
|
||||
public class DatabaseReferencesMetadataCache {
|
||||
|
||||
private static final int BASE_QUANTITY = (REF_META_READS_AT_EVERY_READ < 500 ? REF_META_READS_AT_EVERY_READ : 500);
|
||||
private static final int GOOD_CACHE_SIZE = 140 * BASE_QUANTITY;
|
||||
private static final int FLUSH_CACHE_SIZE = 300 * BASE_QUANTITY;
|
||||
private static final int MAX_CACHE_SIZE = 400 * BASE_QUANTITY;
|
||||
private static final int MODIFIED_FLUSH_CACHE_SIZE = 160 * BASE_QUANTITY;
|
||||
private static final int MODIFIED_MAX_CACHE_SIZE = 260 * BASE_QUANTITY;
|
||||
private final int BASE_QUANTITY;
|
||||
private final int GOOD_CACHE_SIZE;
|
||||
private final int FLUSH_CACHE_SIZE;
|
||||
private final int MAX_CACHE_SIZE;
|
||||
private final int MODIFIED_FLUSH_CACHE_SIZE;
|
||||
private final int MODIFIED_MAX_CACHE_SIZE;
|
||||
|
||||
private final Long2ObjectMap<ReferenceInfo> referencesInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f));
|
||||
private final Long2ObjectMap<ReferenceInfo> modifiedInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MODIFIED_MAX_CACHE_SIZE, 0.5f));
|
||||
private final Long2ObjectMap<ReferenceInfo> referencesInfos;
|
||||
private final Long2ObjectMap<ReferenceInfo> modifiedInfos;
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(false);
|
||||
private final DatabaseReferencesMetadataCacheFlusher flusher;
|
||||
private volatile boolean closed;
|
||||
ExecutorService flushExecutorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "References Flush Thread"));
|
||||
|
||||
public DatabaseReferencesMetadataCache(DatabaseReferencesMetadataCacheFlusher flusher) {
|
||||
public DatabaseReferencesMetadataCache(int filesCount, DatabaseReferencesMetadataCacheFlusher flusher) {
|
||||
BASE_QUANTITY = filesCount * (REF_META_READS_AT_EVERY_READ < 500 ? REF_META_READS_AT_EVERY_READ : 500);
|
||||
GOOD_CACHE_SIZE = 140 * BASE_QUANTITY;
|
||||
FLUSH_CACHE_SIZE = 300 * BASE_QUANTITY;
|
||||
MAX_CACHE_SIZE = 400 * BASE_QUANTITY;
|
||||
MODIFIED_FLUSH_CACHE_SIZE = 160 * BASE_QUANTITY;
|
||||
MODIFIED_MAX_CACHE_SIZE = 260 * BASE_QUANTITY;
|
||||
|
||||
referencesInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MAX_CACHE_SIZE, 0.5f));
|
||||
modifiedInfos = Long2ObjectMaps.synchronize(new Long2ObjectLinkedOpenHashMap<>(MODIFIED_MAX_CACHE_SIZE, 0.5f));
|
||||
|
||||
this.flusher = flusher;
|
||||
}
|
||||
|
||||
@ -82,7 +90,7 @@ public class DatabaseReferencesMetadataCache {
|
||||
if (closed) throw new IOException("Cache already closed!");
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
Long2ObjectMap<ReferenceInfo> referencesBlocksToAdd = new Long2ObjectLinkedOpenHashMap<>(references.length, 0.5f);
|
||||
Long2ObjectMap<ReferenceInfo> referencesBlocksToAdd = new Long2ObjectOpenHashMap<>(references.length, 0.5f);
|
||||
for (int i = 0; i < references.length; i++) {
|
||||
if (infos[i].getCleanerId() != 0) {
|
||||
referencesBlocksToAdd.put(references[i], infos[i]);
|
||||
|
Loading…
Reference in New Issue
Block a user