package it.cavallium.strangedb.java.objects.lists; import it.cavallium.strangedb.VariableWrapper; import it.cavallium.strangedb.functionalinterfaces.ConsumerWithIO; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.cavallium.strangedb.java.objects.EnhancedObject; import it.cavallium.strangedb.java.database.IDatabaseTools; import java.io.IOError; import java.io.IOException; import java.util.StringJoiner; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; public abstract class StrangeDbList extends EnhancedObject implements ElementsList { private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(false); protected abstract LongArrayList getIndices(); public StrangeDbList() { } public StrangeDbList(IDatabaseTools databaseTools) throws IOException { super(databaseTools); } @Override public T get(int index) throws IOException { readWriteLock.readLock().lock(); try { long uid = getIndices().getLong(index); return loadItem(uid); } finally { readWriteLock.readLock().unlock(); } } @Override public void add(T value) throws IOException { long uid = databaseTools.getObjectsIO().newNullObject(); readWriteLock.writeLock().lock(); try { getIndices().add(uid); writeItemToDisk(uid, value); } finally { readWriteLock.writeLock().unlock(); } } @Override public void forEachParallelUnsorted(ConsumerWithIO action) throws IOException { readWriteLock.readLock().lock(); try { forEachParallelUnsorted_(action); } finally { readWriteLock.readLock().unlock(); } } protected void forEachParallelUnsorted_(ConsumerWithIO action) throws IOException { try { int size = size(); ExecutorService executorService = Executors.newFixedThreadPool(ForkJoinPool.getCommonPoolParallelism(), (r) -> new Thread(r, "DBList parallel foreach")); VariableWrapper exceptionVariableWrapper = new VariableWrapper<>(null); for (int i = 0; i < size; i++) { final int index = i; executorService.execute(() -> { try { T t = get(index); action.accept(t); } catch (IOException e) { if (exceptionVariableWrapper.var == null) exceptionVariableWrapper.var = e; } }); } executorService.shutdown(); executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); if (exceptionVariableWrapper.var != null) { throw exceptionVariableWrapper.var; } executorService.shutdownNow(); } catch (InterruptedException e) { throw new IOException(e); } catch (CompletionException e) { throw new IOException(e.getCause()); } } public void forEachIndexParallelUnsorted(ConsumerWithIO action) throws IOException { readWriteLock.readLock().lock(); try { forEachIndexParallelUnsorted_(action); } finally { readWriteLock.readLock().unlock(); } } protected void forEachIndexParallelUnsorted_(ConsumerWithIO action) throws IOException { try { this.getIndices().parallelStream().forEach((id) -> { try { action.accept(id); } catch (IOException e) { throw new CompletionException(e); } }); } catch (CompletionException ex) { throw new IOException(ex.getCause()); } } public void forEach(ConsumerWithIO action) throws IOException { readWriteLock.readLock().lock(); try { forEach_(action); } finally { readWriteLock.readLock().unlock(); } } protected void forEach_(ConsumerWithIO action) throws IOException { int size = size(); for (int i = 0; i < size; i++) { final int index = i; T value = get(index); action.accept(value); } } @Override public void update(int index, T value) throws IOException { set(index, value); } @Override public void set(int index, T value) throws IOException { long uid = databaseTools.getObjectsIO().newNullObject(); readWriteLock.writeLock().lock(); try { getIndices().set(index, uid); writeItemToDisk(uid, value); } finally { readWriteLock.writeLock().unlock(); } } @Override public void add(int index, T value) throws IOException { long uid = databaseTools.getObjectsIO().newNullObject(); readWriteLock.writeLock().lock(); try { getIndices().add(index, uid); writeItemToDisk(uid, value); } finally { readWriteLock.writeLock().unlock(); } } public T getLast() throws IOException { readWriteLock.readLock().lock(); try { if (getIndices().size() > 0) { return get(getIndices().size() - 1); } else { return null; } } finally { readWriteLock.readLock().unlock(); } } public boolean isEmpty() { readWriteLock.readLock().lock(); try { return getIndices().size() <= 0; } finally { readWriteLock.readLock().unlock(); } } public int size() { readWriteLock.readLock().lock(); try { return getIndices().size(); } finally { readWriteLock.readLock().unlock(); } } protected abstract T loadItem(long uid) throws IOException; protected abstract void writeItemToDisk(long uid, T item) throws IOException; @Override public String toString() { return new StringJoiner(", ", StrangeDbList.class.getSimpleName() + "[", "]") .add(getIndices().size() + " items") .toString(); } }