package org.warp.jcwdb.ann; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Output; import it.unimi.dsi.fastutil.booleans.BooleanArrayList; import it.unimi.dsi.fastutil.bytes.ByteArrayList; import it.unimi.dsi.fastutil.chars.CharArrayList; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.shorts.ShortArrayList; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.reflect.MethodUtils; import org.warp.jcwdb.FileIndexManager; import java.io.ByteArrayOutputStream; import java.io.IOError; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.nio.file.Path; import java.util.*; import java.util.function.Consumer; import java.util.function.Supplier; public class DataLoader { private final Kryo kryo = new Kryo(); private final DBObjectIndicesManager objectIndicesManager; private final FileIndexManager indices; private final Object indicesAccessLock = new Object(); private volatile boolean closed; /** * DO NOT USE */ private JCWDatabase databaseInstance; public DataLoader(JCWDatabase databaseInstance, Path dataFile, Path metadataFile, boolean registrationRequired) throws IOException { synchronized (indicesAccessLock) { this.databaseInstance = databaseInstance; this.indices = new FileIndexManager(dataFile, metadataFile); if (!indices.has(0)) { allocateNullValue(); } this.objectIndicesManager = new DBObjectIndicesManager(this.indices); kryo.setRegistrationRequired(registrationRequired); registerDefaultClasses(); } } private void registerDefaultClasses() { int id = -90; registerClass(boolean[].class, id++); registerClass(byte[].class, id++); registerClass(short[].class, id++); registerClass(char[].class, id++); registerClass(int[].class, id++); registerClass(long[].class, id++); registerClass(Boolean[].class, id++); registerClass(Byte[].class, id++); registerClass(Short[].class, id++); registerClass(Character[].class, id++); registerClass(Integer[].class, id++); registerClass(Long[].class, id++); registerClass(String.class, id++); registerClass(String[].class, id++); registerClass(Boolean.class, id++); registerClass(Byte.class, id++); registerClass(Short.class, id++); registerClass(Character.class, id++); registerClass(Integer.class, id++); registerClass(Class.class, id++); registerClass(Object.class, id++); registerClass(Object[].class, id++); registerClass(Long.class, id++); registerClass(String.class, id++); registerClass(String[].class, id++); registerClass(boolean[][].class, id++); registerClass(byte[][].class, id++); registerClass(short[][].class, id++); registerClass(char[][].class, id++); registerClass(int[][].class, id++); registerClass(long[][].class, id++); registerClass(String[][].class, id++); registerClass(List.class, id++); registerClass(ArrayList.class, id++); registerClass(LinkedList.class, id++); registerClass(Set.class, id++); registerClass(HashSet.class, id++); registerClass(LinkedHashSet.class, id++); registerClass(Map.class, id++); registerClass(HashMap.class, id++); registerClass(LinkedHashMap.class, id++); registerClass(TreeMap.class, id++); registerClass(BooleanArrayList.class, id++); registerClass(ByteArrayList.class, id++); registerClass(ShortArrayList.class, id++); registerClass(CharArrayList.class, id++); registerClass(IntArrayList.class, id++); registerClass(LongArrayList.class, id++); registerClass(TreeSet.class, id++); registerClass(SortedSet.class, id++); registerClass(SortedMap.class, id++); } public void close() throws IOException { synchronized (indicesAccessLock) { if (!closed) { closed = true; indices.close(); } } } public void preloadDBObject(DBObject obj, DBObjectIndicesManager.DBObjectInfo objectInfo) throws IOException { synchronized (indicesAccessLock) { preloadDBObjectFields(obj, objectInfo.getFields()); preloadDBObjectProperties(obj, objectInfo.getProperties()); } } T loadRoot(Class rootType, SupplierWithIO ifAbsent) throws IOException { synchronized (indicesAccessLock) { if (isDBObjectNull(0)) { return ifAbsent.getWithIO(); } else { return loadDBObject(rootType, 0); } } } private T instantiateDBObject(Class type) throws IOException { synchronized (indicesAccessLock) { try { T obj = type.getConstructor().newInstance(); obj.database = databaseInstance; return obj; } catch (NoSuchMethodException e) { throw new IOException("You must declare a public empty constructor in class " + type + ": public " + type.getSimpleName() + "()", e); } catch (IllegalAccessException | InvocationTargetException | InstantiationException e) { throw new IOException(e); } } } private void preloadDBObjectFields(DBObject obj, long[] fieldUIDs) throws IOException { synchronized (indicesAccessLock) { // Declare the variables needed to get the biggest field Id Field[] unorderedFields = getFields(obj); // Find the biggest field Id int biggestFieldId = getBiggestFieldId(unorderedFields); // Declare the other variables Field[] fields = new Field[biggestFieldId + 1]; DBDataType[] orderedFieldTypes = new DBDataType[biggestFieldId + 1]; // Load all fields metadata and load them for (Field field : unorderedFields) { DBField fieldAnnotation = field.getAnnotation(DBField.class); int fieldId = fieldAnnotation.id(); DBDataType fieldType = fieldAnnotation.type(); loadField(obj, field, fieldType, fieldUIDs[fieldId]); fields[fieldId] = field; orderedFieldTypes[fieldId] = fieldType; } // Set fields metadata obj.setFields(fields, orderedFieldTypes, fieldUIDs); } } private void preloadDBObjectProperties(DBObject obj, long[] propertyUIDs) { synchronized (indicesAccessLock) { // Declare the variables needed to get the biggest property Id Method[] unorderedPropertyGetters = obj.getPropertyGetters(); Method[] unorderedPropertySetters = obj.getPropertySetters(); // Find the biggest property Id int biggestGetter = getBiggestPropertyGetterId(unorderedPropertyGetters); int biggestSetter = getBiggestPropertySetterId(unorderedPropertySetters); int biggestPropertyId = biggestGetter > biggestSetter ? biggestGetter : biggestSetter; for (Method property : unorderedPropertySetters) { DBPropertySetter fieldAnnotation = property.getAnnotation(DBPropertySetter.class); int propertyId = fieldAnnotation.id(); if (propertyId > biggestPropertyId) { biggestPropertyId = propertyId; } } // Declare the other variables DBDataType[] propertyTypes = new DBDataType[biggestPropertyId + 1]; Method[] propertyGetters = new Method[biggestPropertyId + 1]; Method[] propertySetters = new Method[biggestPropertyId + 1]; Map setterMethods = new LinkedHashMap<>(); Map getterMethods = new LinkedHashMap<>(); // Load the properties metadata for (Method property : unorderedPropertyGetters) { DBPropertyGetter propertyAnnotation = property.getAnnotation(DBPropertyGetter.class); int propertyId = propertyAnnotation.id(); DBDataType propertyType = propertyAnnotation.type(); propertyTypes[propertyId] = propertyType; propertyGetters[propertyId] = property; getterMethods.put(property.getName(), propertyAnnotation); } for (Method property : unorderedPropertySetters) { DBPropertySetter propertyAnnotation = property.getAnnotation(DBPropertySetter.class); int propertyId = propertyAnnotation.id(); DBDataType propertyType = propertyAnnotation.type(); propertyTypes[propertyId] = propertyType; propertySetters[propertyId] = property; setterMethods.put(property.getName(), propertyAnnotation); } // Set properties metadata obj.setProperties(propertyGetters, propertySetters, propertyTypes, propertyUIDs, setterMethods, getterMethods); } } protected Field[] getFields(DBObject obj) { synchronized (indicesAccessLock) { return FieldUtils.getFieldsWithAnnotation(obj.getClass(), DBField.class); } } int getBiggestPropertyGetterId(Method[] unorderedPropertyGetters) { synchronized (indicesAccessLock) { int biggestPropertyId = -1; for (Method property : unorderedPropertyGetters) { DBPropertyGetter fieldAnnotation = property.getAnnotation(DBPropertyGetter.class); int propertyId = fieldAnnotation.id(); if (propertyId > biggestPropertyId) { biggestPropertyId = propertyId; } } return biggestPropertyId; } } int getBiggestPropertySetterId(Method[] unorderedPropertySetters) { synchronized (indicesAccessLock) { int biggestPropertyId = -1; for (Method property : unorderedPropertySetters) { DBPropertySetter fieldAnnotation = property.getAnnotation(DBPropertySetter.class); int propertyId = fieldAnnotation.id(); if (propertyId > biggestPropertyId) { biggestPropertyId = propertyId; } } return biggestPropertyId; } } protected int getBiggestFieldId(Field[] unorderedFields) { synchronized (indicesAccessLock) { int biggestFieldId = -1; for (Field field : unorderedFields) { DBField fieldAnnotation = field.getAnnotation(DBField.class); int propertyId = fieldAnnotation.id(); if (propertyId > biggestFieldId) { biggestFieldId = propertyId; } } return biggestFieldId; } } public void loadProperty(DBObject obj, int propertyId, Method property, DBDataType propertyType, long propertyUID) throws IOException { synchronized (indicesAccessLock) { loadData(propertyType, propertyUID, property::getReturnType, (data) -> { synchronized (indicesAccessLock) { obj.setLoadedProperty(propertyId, data); } }); } } public void loadField(DBObject obj, Field field, DBDataType fieldType, long fieldUID) throws IOException { synchronized (indicesAccessLock) { loadData(fieldType, fieldUID, field::getType, (data) -> { synchronized (indicesAccessLock) { try { if (fieldType == DBDataType.OBJECT && data != null) { if (!field.getType().isInstance(data)) { throw new IOException("There is an attempt to load an object of type " + data.getClass() + " into a field of type " + field.getType()); } } FieldUtils.writeField(field, obj, data, true); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } }); } } @SuppressWarnings("unchecked") private void loadData(DBDataType propertyType, long dataUID, Supplier> returnType, ConsumerWithIO result) throws IOException { synchronized (indicesAccessLock) { switch (propertyType) { case DATABASE_OBJECT: DBObject fieldDBObjectValue = loadDBObject((Class) returnType.get(), dataUID); //System.err.println("Loading data DBObj " + dataUID + ":" + fieldDBObjectValue); result.accept(fieldDBObjectValue); return; case OBJECT: Object fieldObjectValue = loadObject(dataUID); //System.err.println("Loading data Obj " + dataUID + ":" + fieldObjectValue); result.accept(fieldObjectValue); return; case UID_LIST: LongArrayList fieldListObjectValue = loadListObject(dataUID); //System.err.println("Loading data LOb " + dataUID + ":" + fieldListObjectValue); result.accept(fieldListObjectValue); return; case BOOLEAN: boolean fieldBooleanValue = loadBoolean(dataUID); //System.err.println("Loading data Boo " + dataUID + ":" + fieldBooleanValue); result.accept(fieldBooleanValue); return; case BYTE: byte fieldByteValue = loadByte(dataUID); //System.err.println("Loading data Byt " + dataUID + ":" + fieldByteValue); result.accept(fieldByteValue); return; case SHORT: short fieldShortValue = loadShort(dataUID); //System.err.println("Loading data Shr " + dataUID + ":" + fieldShortValue); result.accept(fieldShortValue); return; case CHAR: char fieldCharValue = loadChar(dataUID); //System.err.println("Loading data Chr " + dataUID + ":" + fieldCharValue); result.accept(fieldCharValue); return; case INTEGER: int fieldIntValue = loadInt(dataUID); //System.err.println("Loading data Int " + dataUID + ":" + fieldIntValue); result.accept(fieldIntValue); return; case LONG: long fieldLongValue = loadLong(dataUID); //System.err.println("Loading data Lng " + dataUID + ":" + fieldLongValue); result.accept(fieldLongValue); return; default: throw new NullPointerException("Unknown data type"); } } } public T loadDBObject(Class type, long propertyUID) throws IOException { synchronized (indicesAccessLock) { DBObjectIndicesManager.DBObjectInfo objectInfo = readUIDs(propertyUID); if (objectInfo == null) return null; T obj = instantiateDBObject(type); preloadDBObject(obj, objectInfo); return obj; } } private boolean isDBObjectNull(long uid) { synchronized (indicesAccessLock) { try { return !objectIndicesManager.has(uid) || objectIndicesManager.get(uid) == null; } catch (IOException ex) { throw new IOError(ex); } } } @SuppressWarnings("unchecked") public T loadObject(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { synchronized (indicesAccessLock) { if (size != 0) { return (T) kryo.readClassAndObject(i); } else { return null; } } }); } } private LongArrayList loadListObject(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { synchronized (indicesAccessLock) { if (size != 0) { LongArrayList list = new LongArrayList(); int listSize = i.readVarInt(true); for (int li = 0; li < listSize; li++) { list.add(i.readVarLong(true)); } return list; } else { return null; } } }); } } public boolean loadBoolean(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { synchronized (indicesAccessLock) { if (size != 0) { return i.readBoolean(); } else { return false; } } }); } } public byte loadByte(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { synchronized (indicesAccessLock) { if (size != 0) { return i.readByte(); } else { return (byte) 0; } } }); } } public short loadShort(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { synchronized (indicesAccessLock) { if (size != 0) { return i.readShort(); } else { return (short) 0; } } }); } } public char loadChar(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { synchronized (indicesAccessLock) { if (size != 0) { return i.readChar(); } else { return (char) 0; } } }); } } public int loadInt(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { if (size != 0) { return i.readInt(); } else { return 0; } }); } } public long loadLong(long uid) throws IOException { synchronized (indicesAccessLock) { return indices.get(uid, (i, size) -> { synchronized (indicesAccessLock) { if (size != 0) { return i.readLong(); } else { return 0L; } } }); } } public boolean exists(long uid) { synchronized (indicesAccessLock) { return objectIndicesManager.has(uid); } } public void writeObjectInfo(long uid, long[] fieldUIDs, long[] propertyUIDs) throws IOException { synchronized (indicesAccessLock) { //System.err.println("Saving obj. " + uid); this.objectIndicesManager.set(uid, fieldUIDs, propertyUIDs); } } private void writeObjectInfoNull(long uid) throws IOException { synchronized (indicesAccessLock) { this.objectIndicesManager.setNull(uid); } } /** * * @param uid * @return */ public DBObjectIndicesManager.DBObjectInfo readUIDs(long uid) { synchronized (indicesAccessLock) { try { return objectIndicesManager.get(uid); } catch (IOException e) { throw new IOError(e); } } } public void writeObjectProperty(long uid, DBDataType propertyType, T loadedPropertyValue) throws IOException { synchronized (indicesAccessLock) { switch (propertyType) { case BOOLEAN: indices.set(uid, 1, (o) -> { synchronized (indicesAccessLock) { o.writeBoolean(loadedPropertyValue == null ? false : (boolean) loadedPropertyValue); } }); //System.err.println("Saving data Boo " + uid + ":" + loadedPropertyValue); break; case BYTE: indices.set(uid, Byte.BYTES, (o) -> { synchronized (indicesAccessLock) { o.writeByte(loadedPropertyValue == null ? 0 : (byte) loadedPropertyValue); } }); //System.err.println("Saving data Byt " + uid + ":" + loadedPropertyValue); break; case SHORT: indices.set(uid, Short.BYTES, (o) -> { synchronized (indicesAccessLock) { o.writeShort(loadedPropertyValue == null ? 0 : (short) loadedPropertyValue); } }); //System.err.println("Saving data Shr " + uid + ":" + loadedPropertyValue); break; case CHAR: indices.set(uid, Character.BYTES, (o) -> { synchronized (indicesAccessLock) { o.writeChar(loadedPropertyValue == null ? 0 : (char) loadedPropertyValue); } }); //System.err.println("Saving data Chr " + uid + ":" + loadedPropertyValue); break; case INTEGER: indices.set(uid, Integer.BYTES, (o) -> { synchronized (indicesAccessLock) { o.writeInt(loadedPropertyValue == null ? 0 : (int) loadedPropertyValue); } }); //System.err.println("Saving data Int " + uid + ":" + loadedPropertyValue); break; case LONG: indices.set(uid, Long.BYTES, (o) -> { synchronized (indicesAccessLock) { o.writeLong(loadedPropertyValue == null ? 0 : (long) loadedPropertyValue); } }); //System.err.println("Saving data Lng " + uid + ":" + loadedPropertyValue); break; case OBJECT: Output baosOutput = new Output(new ByteArrayOutputStream()); kryo.writeClassAndObject(baosOutput, loadedPropertyValue); //System.err.println("Saving data Obj " + uid + ":" + loadedPropertyValue); if (loadedPropertyValue instanceof Class) { System.out.println(); } byte[] out = baosOutput.toBytes(); indices.set(uid, out.length, o -> { synchronized (indicesAccessLock) { o.write(out, 0, out.length); } }); break; case UID_LIST: if (loadedPropertyValue == null) { indices.set(uid, 0, (o) -> { }); } else { LongArrayList list = (LongArrayList) loadedPropertyValue; final int listSize = list.size(); Output baosListOutput = new Output(Long.BYTES * 100, Long.BYTES * (listSize > 100 ? listSize : 100)); baosListOutput.writeVarInt(listSize, true); for (int i = 0; i < listSize; i++) { baosListOutput.writeVarLong(list.getLong(i), true); } //System.err.println("Saving data LOb " + uid + ":" + loadedPropertyValue); byte[] outList = baosListOutput.toBytes(); indices.set(uid, outList.length, o -> { synchronized (indicesAccessLock) { o.write(outList, 0, outList.length); } }); } break; case DATABASE_OBJECT: //System.err.println("Saving data DBObj " + uid + ":" + loadedPropertyValue); if (loadedPropertyValue == null) { writeObjectInfoNull(uid); } else { ((DBObject) loadedPropertyValue).writeToDisk(uid); } break; } } } public void registerClass(Class clazz, int id) { synchronized (indicesAccessLock) { kryo.register(clazz, 100 + id); } } public long allocateNullValue() { synchronized (indicesAccessLock) { return indices.add(0); } } public long[] allocateNewUIDs(int quantity) { synchronized (indicesAccessLock) { long[] ids = new long[quantity]; for (int i = 0; i < quantity; i++) { ids[i] = allocateNullValue(); } return ids; } } }