package it.cavallium.strangedb.java.database; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; public class KryoSerializer implements ISerializer { private static final int KRYO_INSTANCES = 20; private ReentrantLock[] locks = new ReentrantLock[KRYO_INSTANCES]; private final Kryo[] kryo = new Kryo[KRYO_INSTANCES]; private AtomicInteger current = new AtomicInteger(0); private ConcurrentHashMap, Integer> registeredClasses = new ConcurrentHashMap<>(200); public KryoSerializer() { for (int i = 0; i < KRYO_INSTANCES; i++) { locks[i] = new ReentrantLock(false); kryo[i] = new Kryo(); kryo[i].setRegistrationRequired(false); kryo[i].setWarnUnregisteredClasses(true); } } @SuppressWarnings("unchecked") @Override public Class readClassBytes(byte[] input) { int i = current.getAndUpdate((currentInt) -> currentInt + 1 == KRYO_INSTANCES ? 0 : currentInt + 1); locks[i].lock(); try { return (Class) kryo[i].readClass(new Input(input)).getType(); } finally { locks[i].unlock(); } } @Override public byte[] writeClassBytes(Class value) { int i = current.getAndUpdate((currentInt) -> currentInt + 1 == KRYO_INSTANCES ? 0 : currentInt + 1); locks[i].lock(); try { Output out = new Output(1024, Integer.MAX_VALUE); kryo[i].writeClass(out, value); out.flush(); out.close(); return out.toBytes(); } finally { locks[i].unlock(); } } @Override public byte[] writeClassAndObjectBytes(T value) { int i = current.getAndUpdate((currentInt) -> currentInt + 1 == KRYO_INSTANCES ? 0 : currentInt + 1); locks[i].lock(); try { Output out = new Output(1024, Integer.MAX_VALUE); kryo[i].writeClassAndObject(out, value); out.flush(); out.close(); return out.toBytes(); } finally { locks[i].unlock(); } } @SuppressWarnings("unchecked") @Override public T readClassAndObjectBytes(byte[] input) { int i = current.getAndUpdate((currentInt) -> currentInt + 1 == KRYO_INSTANCES ? 0 : currentInt + 1); locks[i].lock(); try { return (T) kryo[i].readClassAndObject(new Input(input)); } finally { locks[i].unlock(); } } @Override public void registerClass(Class type, int id) { registeredClasses.put(type, id); for (int i = 0; i < KRYO_INSTANCES; i++) { locks[i].lock(); try { kryo[i].register(type, id); } finally { locks[i].unlock(); } } } @Override public Map, Integer> getRegisteredClasses() { for (int i = 0; i < KRYO_INSTANCES; i++) { locks[i].lock(); } try { return new HashMap<>(registeredClasses); } finally { for (int i = 0; i < KRYO_INSTANCES; i++) { locks[i].unlock(); } } } }