diff --git a/README.md b/README.md
index 38f9ecd..5e62867 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,55 @@
-# CavalliumDBEngine
+CavalliumDB Engine
+==================
-Database engine for Java
\ No newline at end of file
+A very simple wrapper for RocksDB and Lucene, with gRPC and direct connections.
+
+This is not a database, but only a wrapper for Lucene Core and RocksDB, with a bit of abstraction.
+
+# Features
+## RocksDB Key-Value NoSQL database engine
+- Snapshots
+- Multi-column databases
+- WAL and corruption recovery strategies
+- Multiple data types:
+ - Bytes (Singleton)
+ - Maps of bytes (Dictionary)
+ - Maps of maps of bytes (Deep dictionary)
+ - Sets of bytes (Dictionary without values)
+ - Maps of sets of bytes (Deep dictionary without values)
+
+## Apache Lucene Core indexing library
+- Documents structure
+- Sorting
+ - Ascending and descending
+ - Numeric or non-numeric
+- Searching
+ - Nested search terms
+ - Combined search terms
+ - Fuzzy text search
+ - Coordinates, integers, longs, strings, text
+- Indicization and analysis
+ - N-gram
+ - Edge N-gram
+ - English words
+ - Stemming
+ - Stopwords removal
+- Results filtering
+- Snapshots
+
+# F.A.Q.
+## Why is it so difficult?
+This is not a DMBS.
+
+This is an engine on which a DBMS can be built upon. For this reason it's very difficult to use it directly without using it through abstraction layers.
+
+## Can I use objects in the database?
+Yes you must serialize/deserialize them using a library of your choice.
+
+## Why there is a snapshot function for each database part?
+Since RocksDB and lucene indices are different instances, every instance has its own snapshot function.
+
+To have a single snapshot you must implement it as a collection of sub-snapshots in your DBMS.
+
+## Is CavalliumDB Engine suitable for your project?
+No.
+This engine is largely undocumented, and it doesn't provide extensive tests on its methods.
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..6c701c7
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,276 @@
+
+ 4.0.0
+
+ CavalliumDBEngine
+
+ it.cavallium
+ dbengine
+ 2.0.1
+
+ jar
+
+ UTF-8
+ 1.29.0
+ 3.11.4
+ 3.11.4
+ 2.0.30.Final
+
+ 11
+ 11
+
+
+
+ protoarch
+ protoarch
+ http://home.apache.org/~aajisaka/repository
+
+
+ mchv-release
+ MCHV Release Apache Maven Packages
+ https://mvn.mchv.eu/repository/mchv
+
+
+ mchv-snapshot
+ MCHV Snapshot Apache Maven Packages
+ https://mvn.mchv.eu/repository/mchv-snapshot
+
+
+
+
+ org.warp
+ common-utils
+ 1.1.1
+
+
+ javax.xml.bind
+ jaxb-api
+ 2.2.11
+
+
+ com.sun.xml.bind
+ jaxb-core
+ 2.2.11
+
+
+ com.sun.xml.bind
+ jaxb-impl
+ 2.2.11
+
+
+ javax.activation
+ activation
+ 1.1.1
+
+
+ it.cavallium
+ concurrent-locks
+ 1.0.8
+
+
+ org.yaml
+ snakeyaml
+ 1.24
+
+
+ io.grpc
+ grpc-netty-shaded
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-protobuf
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-stub
+ ${grpc.version}
+
+
+ io.grpc
+ grpc-alts
+ ${grpc.version}
+
+
+ javax.annotation
+ javax.annotation-api
+ 1.2
+
+
+ io.grpc
+ grpc-testing
+ ${grpc.version}
+ test
+
+
+
+
+ io.grpc
+ grpc-netty
+ ${grpc.version}
+
+
+ io.netty
+ netty-tcnative-boringssl-static
+ ${netty.tcnative.version}
+
+
+
+ com.google.api.grpc
+ proto-google-common-protos
+ 1.0.0
+
+
+ com.google.protobuf
+ protobuf-java-util
+ ${protobuf.version}
+
+
+ org.mockito
+ mockito-core
+ 1.9.5
+ test
+
+
+ it.unimi.dsi
+ fastutil
+ 8.3.1
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ RELEASE
+ test
+
+
+ org.hamcrest
+ hamcrest-core
+
+
+
+
+
+ org.hamcrest
+ hamcrest-library
+ 1.3
+ test
+
+
+ org.rocksdb
+ rocksdbjni
+ 6.11.4
+
+
+ org.apache.lucene
+ lucene-core
+ 8.6.2
+
+
+ org.apache.lucene
+ lucene-analyzers-common
+ 8.6.2
+
+
+ org.apache.lucene
+ lucene-codecs
+ 8.6.2
+
+
+ org.apache.lucene
+ lucene-backward-codecs
+ 8.6.2
+
+
+ org.apache.lucene
+ lucene-queries
+ 8.6.2
+
+
+ org.jetbrains
+ annotations
+ 19.0.0
+
+
+ io.projectreactor
+ reactor-core
+ 3.4.0
+
+
+ io.projectreactor
+ reactor-tools
+ 3.4.0
+
+
+
+
+ src/test/java
+
+
+ ../src/main/libs
+
+ **/*.jar
+
+
+
+
+
+ kr.motd.maven
+ os-maven-plugin
+ 1.5.0.Final
+
+
+
+
+ org.apache.maven.plugins
+ maven-install-plugin
+ 3.0.0-M1
+
+
+ org.xolstice.maven.plugins
+ protobuf-maven-plugin
+ 0.5.1
+
+ com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
+ grpc-java
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ ${basedir}/src/main/proto
+
+
+
+ generate-sources
+
+ compile
+ compile-custom
+
+
+
+
+
+ maven-dependency-plugin
+
+
+ compile
+
+ copy-dependencies
+
+
+ ${project.build.directory}/lib
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+
+ 11
+
+
+
+
+
diff --git a/src/main/java/it/cavallium/dbengine/database/Column.java b/src/main/java/it/cavallium/dbengine/database/Column.java
new file mode 100644
index 0000000..09943b8
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/Column.java
@@ -0,0 +1,58 @@
+package it.cavallium.dbengine.database;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.StringJoiner;
+
+public class Column {
+
+ private final String name;
+
+ private Column(String name) {
+ this.name = name;
+ }
+
+ public static Column hashMap(String name) {
+ return new Column("hash_map_" + name);
+ }
+
+ public static Column fixedSet(String name) {
+ return new Column("hash_set_" + name);
+ }
+
+ public static Column special(String name) {
+ return new Column(name);
+ }
+
+ public static String toString(byte[] name) {
+ return new String(name, StandardCharsets.US_ASCII);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Column)) {
+ return false;
+ }
+ Column column = (Column) o;
+ return Objects.equals(name, column.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", Column.class.getSimpleName() + "[", "]")
+ .add("name='" + name + "'")
+ .toString();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/EnglishItalianStopFilter.java b/src/main/java/it/cavallium/dbengine/database/EnglishItalianStopFilter.java
new file mode 100644
index 0000000..dfacbca
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/EnglishItalianStopFilter.java
@@ -0,0 +1,1005 @@
+package it.cavallium.dbengine.database;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.lucene.analysis.CharArraySet;
+import org.apache.lucene.analysis.StopFilter;
+import org.apache.lucene.analysis.TokenStream;
+
+public class EnglishItalianStopFilter extends StopFilter {
+
+ private static final CharArraySet stopWords;
+
+ /**
+ * Constructs a filter which removes words from the input TokenStream that are named in the Set.
+ *
+ * @param in Input stream
+ * @see #makeStopSet(String...)
+ */
+ public EnglishItalianStopFilter(TokenStream in) {
+ super(in, stopWords);
+ }
+
+ static {
+ var englishStopWords = Set.of("a",
+ "an",
+ "and",
+ "are",
+ "as",
+ "at",
+ "be",
+ "but",
+ "by",
+ "for",
+ "if",
+ "in",
+ "into",
+ "is",
+ "it",
+ "no",
+ "not",
+ "of",
+ "on",
+ "or",
+ "such",
+ "that",
+ "the",
+ "their",
+ "then",
+ "there",
+ "these",
+ "they",
+ "this",
+ "to",
+ "was",
+ "will",
+ "with"
+ );
+ var oldItalianStopWords = Set.of("a",
+ "abbastanza",
+ "abbia",
+ "abbiamo",
+ "abbiano",
+ "abbiate",
+ "accidenti",
+ "ad",
+ "adesso",
+ "affinché",
+ "agl",
+ "agli",
+ "ahime",
+ "ahimè",
+ "ai",
+ "al",
+ "alcuna",
+ "alcuni",
+ "alcuno",
+ "all",
+ "alla",
+ "alle",
+ "allo",
+ "allora",
+ "altre",
+ "altri",
+ "altrimenti",
+ "altro",
+ "altrove",
+ "altrui",
+ "anche",
+ "ancora",
+ "anni",
+ "anno",
+ "ansa",
+ "anticipo",
+ "assai",
+ "attesa",
+ "attraverso",
+ "avanti",
+ "avemmo",
+ "avendo",
+ "avente",
+ "aver",
+ "avere",
+ "averlo",
+ "avesse",
+ "avessero",
+ "avessi",
+ "avessimo",
+ "aveste",
+ "avesti",
+ "avete",
+ "aveva",
+ "avevamo",
+ "avevano",
+ "avevate",
+ "avevi",
+ "avevo",
+ "avrai",
+ "avranno",
+ "avrebbe",
+ "avrebbero",
+ "avrei",
+ "avremmo",
+ "avremo",
+ "avreste",
+ "avresti",
+ "avrete",
+ "avrà",
+ "avrò",
+ "avuta",
+ "avute",
+ "avuti",
+ "avuto",
+ "basta",
+ "ben",
+ "bene",
+ "benissimo",
+ "brava",
+ "bravo",
+ "buono",
+ "c",
+ "caso",
+ "cento",
+ "certa",
+ "certe",
+ "certi",
+ "certo",
+ "che",
+ "chi",
+ "chicchessia",
+ "chiunque",
+ "ci",
+ "ciascuna",
+ "ciascuno",
+ "cima",
+ "cinque",
+ "cio",
+ "cioe",
+ "cioè",
+ "circa",
+ "citta",
+ "città",
+ "ciò",
+ "co",
+ "codesta",
+ "codesti",
+ "codesto",
+ "cogli",
+ "coi",
+ "col",
+ "colei",
+ "coll",
+ "coloro",
+ "colui",
+ "come",
+ "cominci",
+ "comprare",
+ "comunque",
+ "con",
+ "concernente",
+ "conclusione",
+ "consecutivi",
+ "consecutivo",
+ "consiglio",
+ "contro",
+ "cortesia",
+ "cos",
+ "cosa",
+ "cosi",
+ "così",
+ "cui",
+ "d",
+ "da",
+ "dagl",
+ "dagli",
+ "dai",
+ "dal",
+ "dall",
+ "dalla",
+ "dalle",
+ "dallo",
+ "dappertutto",
+ "davanti",
+ "degl",
+ "degli",
+ "dei",
+ "del",
+ "dell",
+ "della",
+ "delle",
+ "dello",
+ "dentro",
+ "detto",
+ "deve",
+ "devo",
+ "di",
+ "dice",
+ "dietro",
+ "dire",
+ "dirimpetto",
+ "diventa",
+ "diventare",
+ "diventato",
+ "dopo",
+ "doppio",
+ "dov",
+ "dove",
+ "dovra",
+ "dovrà",
+ "dovunque",
+ "due",
+ "dunque",
+ "durante",
+ "e",
+ "ebbe",
+ "ebbero",
+ "ebbi",
+ "ecc",
+ "ecco",
+ "ed",
+ "effettivamente",
+ "egli",
+ "ella",
+ "entrambi",
+ "eppure",
+ "era",
+ "erano",
+ "eravamo",
+ "eravate",
+ "eri",
+ "ero",
+ "esempio",
+ "esse",
+ "essendo",
+ "esser",
+ "essere",
+ "essi",
+ "ex",
+ "fa",
+ "faccia",
+ "facciamo",
+ "facciano",
+ "facciate",
+ "faccio",
+ "facemmo",
+ "facendo",
+ "facesse",
+ "facessero",
+ "facessi",
+ "facessimo",
+ "faceste",
+ "facesti",
+ "faceva",
+ "facevamo",
+ "facevano",
+ "facevate",
+ "facevi",
+ "facevo",
+ "fai",
+ "fanno",
+ "farai",
+ "faranno",
+ "fare",
+ "farebbe",
+ "farebbero",
+ "farei",
+ "faremmo",
+ "faremo",
+ "fareste",
+ "faresti",
+ "farete",
+ "farà",
+ "farò",
+ "fatto",
+ "favore",
+ "fece",
+ "fecero",
+ "feci",
+ "fin",
+ "finalmente",
+ "finche",
+ "fine",
+ "fino",
+ "forse",
+ "forza",
+ "fosse",
+ "fossero",
+ "fossi",
+ "fossimo",
+ "foste",
+ "fosti",
+ "fra",
+ "frattempo",
+ "fu",
+ "fui",
+ "fummo",
+ "fuori",
+ "furono",
+ "futuro",
+ "generale",
+ "gente",
+ "gia",
+ "giacche",
+ "giorni",
+ "giorno",
+ "giu",
+ "già",
+ "gli",
+ "gliela",
+ "gliele",
+ "glieli",
+ "glielo",
+ "gliene",
+ "grande",
+ "grazie",
+ "gruppo",
+ "ha",
+ "haha",
+ "hai",
+ "hanno",
+ "ho",
+ "i",
+ "ie",
+ "ieri",
+ "il",
+ "improvviso",
+ "in",
+ "inc",
+ "indietro",
+ "infatti",
+ "inoltre",
+ "insieme",
+ "intanto",
+ "intorno",
+ "invece",
+ "io",
+ "l",
+ "la",
+ "lasciato",
+ "lato",
+ "le",
+ "lei",
+ "li",
+ "lo",
+ "lontano",
+ "loro",
+ "lui",
+ "lungo",
+ "luogo",
+ "là",
+ "ma",
+ "macche",
+ "magari",
+ "maggior",
+ "mai",
+ "male",
+ "malgrado",
+ "malissimo",
+ "me",
+ "medesimo",
+ "mediante",
+ "meglio",
+ "meno",
+ "mentre",
+ "mesi",
+ "mezzo",
+ "mi",
+ "mia",
+ "mie",
+ "miei",
+ "mila",
+ "miliardi",
+ "milioni",
+ "minimi",
+ "mio",
+ "modo",
+ "molta",
+ "molti",
+ "moltissimo",
+ "molto",
+ "momento",
+ "mondo",
+ "ne",
+ "negl",
+ "negli",
+ "nei",
+ "nel",
+ "nell",
+ "nella",
+ "nelle",
+ "nello",
+ "nemmeno",
+ "neppure",
+ "nessun",
+ "nessuna",
+ "nessuno",
+ "niente",
+ "no",
+ "noi",
+ "nome",
+ "non",
+ "nondimeno",
+ "nonostante",
+ "nonsia",
+ "nostra",
+ "nostre",
+ "nostri",
+ "nostro",
+ "novanta",
+ "nove",
+ "nulla",
+ "nuovi",
+ "nuovo",
+ "o",
+ "od",
+ "oggi",
+ "ogni",
+ "ognuna",
+ "ognuno",
+ "oltre",
+ "oppure",
+ "ora",
+ "ore",
+ "osi",
+ "ossia",
+ "ottanta",
+ "otto",
+ "paese",
+ "parecchi",
+ "parecchie",
+ "parecchio",
+ "parte",
+ "partendo",
+ "peccato",
+ "peggio",
+ "per",
+ "perche",
+ "perchè",
+ "perché",
+ "percio",
+ "perciò",
+ "perfino",
+ "pero",
+ "persino",
+ "persone",
+ "però",
+ "piedi",
+ "pieno",
+ "piglia",
+ "piu",
+ "piuttosto",
+ "più",
+ "po",
+ "pochissimo",
+ "poco",
+ "poi",
+ "poiche",
+ "possa",
+ "possedere",
+ "posteriore",
+ "posto",
+ "potrebbe",
+ "preferibilmente",
+ "presa",
+ "press",
+ "prima",
+ "primo",
+ "principalmente",
+ "probabilmente",
+ "promesso",
+ "proprio",
+ "puo",
+ "pure",
+ "purtroppo",
+ "può",
+ "qua",
+ "qualche",
+ "qualcosa",
+ "qualcuna",
+ "qualcuno",
+ "quale",
+ "quali",
+ "qualunque",
+ "quando",
+ "quanta",
+ "quante",
+ "quanti",
+ "quanto",
+ "quantunque",
+ "quarto",
+ "quasi",
+ "quattro",
+ "quel",
+ "quella",
+ "quelle",
+ "quelli",
+ "quello",
+ "quest",
+ "questa",
+ "queste",
+ "questi",
+ "questo",
+ "qui",
+ "quindi",
+ "quinto",
+ "realmente",
+ "recente",
+ "recentemente",
+ "registrazione",
+ "relativo",
+ "riecco",
+ "rispetto",
+ "salvo",
+ "sara",
+ "sarai",
+ "saranno",
+ "sarebbe",
+ "sarebbero",
+ "sarei",
+ "saremmo",
+ "saremo",
+ "sareste",
+ "saresti",
+ "sarete",
+ "sarà",
+ "sarò",
+ "scola",
+ "scopo",
+ "scorso",
+ "se",
+ "secondo",
+ "seguente",
+ "seguito",
+ "sei",
+ "sembra",
+ "sembrare",
+ "sembrato",
+ "sembrava",
+ "sembri",
+ "sempre",
+ "senza",
+ "sette",
+ "si",
+ "sia",
+ "siamo",
+ "siano",
+ "siate",
+ "siete",
+ "sig",
+ "solito",
+ "solo",
+ "soltanto",
+ "sono",
+ "sopra",
+ "soprattutto",
+ "sotto",
+ "spesso",
+ "sta",
+ "stai",
+ "stando",
+ "stanno",
+ "starai",
+ "staranno",
+ "starebbe",
+ "starebbero",
+ "starei",
+ "staremmo",
+ "staremo",
+ "stareste",
+ "staresti",
+ "starete",
+ "starà",
+ "starò",
+ "stata",
+ "state",
+ "stati",
+ "stato",
+ "stava",
+ "stavamo",
+ "stavano",
+ "stavate",
+ "stavi",
+ "stavo",
+ "stemmo",
+ "stessa",
+ "stesse",
+ "stessero",
+ "stessi",
+ "stessimo",
+ "stesso",
+ "steste",
+ "stesti",
+ "stette",
+ "stettero",
+ "stetti",
+ "stia",
+ "stiamo",
+ "stiano",
+ "stiate",
+ "sto",
+ "su",
+ "sua",
+ "subito",
+ "successivamente",
+ "successivo",
+ "sue",
+ "sugl",
+ "sugli",
+ "sui",
+ "sul",
+ "sull",
+ "sulla",
+ "sulle",
+ "sullo",
+ "suo",
+ "suoi",
+ "tale",
+ "tali",
+ "talvolta",
+ "tanto",
+ "te",
+ "tempo",
+ "terzo",
+ "th",
+ "ti",
+ "titolo",
+ "tra",
+ "tranne",
+ "tre",
+ "trenta",
+ "triplo",
+ "troppo",
+ "trovato",
+ "tu",
+ "tua",
+ "tue",
+ "tuo",
+ "tuoi",
+ "tutta",
+ "tuttavia",
+ "tutte",
+ "tutti",
+ "tutto",
+ "uguali",
+ "ulteriore",
+ "ultimo",
+ "un",
+ "una",
+ "uno",
+ "uomo",
+ "va",
+ "vai",
+ "vale",
+ "vari",
+ "varia",
+ "varie",
+ "vario",
+ "verso",
+ "vi",
+ "vicino",
+ "visto",
+ "vita",
+ "voi",
+ "volta",
+ "volte",
+ "vostra",
+ "vostre",
+ "vostri",
+ "vostro",
+ "è");
+ var italianStopWords = Set.of("a",
+ "abbia",
+ "abbiamo",
+ "abbiano",
+ "abbiate",
+ "ad",
+ "adesso",
+ "agl",
+ "agli",
+ "ai",
+ "al",
+ "all",
+ "alla",
+ "alle",
+ "allo",
+ "allora",
+ "altre",
+ "altri",
+ "altro",
+ "anche",
+ "ancora",
+ "avemmo",
+ "avendo",
+ "avere",
+ "avesse",
+ "avessero",
+ "avessi",
+ "avessimo",
+ "aveste",
+ "avesti",
+ "avete",
+ "aveva",
+ "avevamo",
+ "avevano",
+ "avevate",
+ "avevi",
+ "avevo",
+ "avrai",
+ "avranno",
+ "avrebbe",
+ "avrebbero",
+ "avrei",
+ "avremmo",
+ "avremo",
+ "avreste",
+ "avresti",
+ "avrete",
+ "avrà",
+ "avrò",
+ "avuta",
+ "avute",
+ "avuti",
+ "avuto",
+ "c",
+ "che",
+ "chi",
+ "ci",
+ "coi",
+ "col",
+ "come",
+ "con",
+ "contro",
+ "cui",
+ "da",
+ "dagl",
+ "dagli",
+ "dai",
+ "dal",
+ "dall",
+ "dalla",
+ "dalle",
+ "dallo",
+ "degl",
+ "degli",
+ "dei",
+ "del",
+ "dell",
+ "della",
+ "delle",
+ "dello",
+ "dentro",
+ "di",
+ "dov",
+ "dove",
+ "e",
+ "ebbe",
+ "ebbero",
+ "ebbi",
+ "ecco",
+ "ed",
+ "era",
+ "erano",
+ "eravamo",
+ "eravate",
+ "eri",
+ "ero",
+ "essendo",
+ "faccia",
+ "facciamo",
+ "facciano",
+ "facciate",
+ "faccio",
+ "facemmo",
+ "facendo",
+ "facesse",
+ "facessero",
+ "facessi",
+ "facessimo",
+ "faceste",
+ "facesti",
+ "faceva",
+ "facevamo",
+ "facevano",
+ "facevate",
+ "facevi",
+ "facevo",
+ "fai",
+ "fanno",
+ "farai",
+ "faranno",
+ "fare",
+ "farebbe",
+ "farebbero",
+ "farei",
+ "faremmo",
+ "faremo",
+ "fareste",
+ "faresti",
+ "farete",
+ "farà",
+ "farò",
+ "fece",
+ "fecero",
+ "feci",
+ "fino",
+ "fosse",
+ "fossero",
+ "fossi",
+ "fossimo",
+ "foste",
+ "fosti",
+ "fra",
+ "fu",
+ "fui",
+ "fummo",
+ "furono",
+ "giù",
+ "gli",
+ "ha",
+ "hai",
+ "hanno",
+ "ho",
+ "i",
+ "il",
+ "in",
+ "io",
+ "l",
+ "la",
+ "le",
+ "lei",
+ "li",
+ "lo",
+ "loro",
+ "lui",
+ "ma",
+ "me",
+ "mi",
+ "mia",
+ "mie",
+ "miei",
+ "mio",
+ "ne",
+ "negl",
+ "negli",
+ "nei",
+ "nel",
+ "nell",
+ "nella",
+ "nelle",
+ "nello",
+ "no",
+ "noi",
+ "non",
+ "nostra",
+ "nostre",
+ "nostri",
+ "nostro",
+ "o",
+ "per",
+ "perché",
+ "però",
+ "più",
+ "pochi",
+ "poco",
+ "qua",
+ "quale",
+ "quanta",
+ "quante",
+ "quanti",
+ "quanto",
+ "quasi",
+ "quella",
+ "quelle",
+ "quelli",
+ "quello",
+ "questa",
+ "queste",
+ "questi",
+ "questo",
+ "qui",
+ "quindi",
+ "sarai",
+ "saranno",
+ "sarebbe",
+ "sarebbero",
+ "sarei",
+ "saremmo",
+ "saremo",
+ "sareste",
+ "saresti",
+ "sarete",
+ "sarà",
+ "sarò",
+ "se",
+ "sei",
+ "senza",
+ "si",
+ "sia",
+ "siamo",
+ "siano",
+ "siate",
+ "siete",
+ "sono",
+ "sopra",
+ "sotto",
+ "sta",
+ "stai",
+ "stando",
+ "stanno",
+ "starai",
+ "staranno",
+ "stare",
+ "starebbe",
+ "starebbero",
+ "starei",
+ "staremmo",
+ "staremo",
+ "stareste",
+ "staresti",
+ "starete",
+ "starà",
+ "starò",
+ "stava",
+ "stavamo",
+ "stavano",
+ "stavate",
+ "stavi",
+ "stavo",
+ "stemmo",
+ "stesse",
+ "stessero",
+ "stessi",
+ "stessimo",
+ "stesso",
+ "steste",
+ "stesti",
+ "stette",
+ "stettero",
+ "stetti",
+ "stia",
+ "stiamo",
+ "stiano",
+ "stiate",
+ "sto",
+ "su",
+ "sua",
+ "sue",
+ "sugl",
+ "sugli",
+ "sui",
+ "sul",
+ "sull",
+ "sulla",
+ "sulle",
+ "sullo",
+ "suo",
+ "suoi",
+ "te",
+ "ti",
+ "tra",
+ "tu",
+ "tua",
+ "tue",
+ "tuo",
+ "tuoi",
+ "tutti",
+ "tutto",
+ "un",
+ "una",
+ "uno",
+ "vai",
+ "vi",
+ "voi",
+ "vostra",
+ "vostre",
+ "vostri",
+ "vostro",
+ "è"
+ );
+ stopWords = CharArraySet.copy(Stream
+ .concat(englishStopWords.stream(), oldItalianStopWords.stream())
+ .map(String::toCharArray)
+ .collect(Collectors.toSet()));
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
new file mode 100644
index 0000000..3caca79
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLDatabaseConnection.java
@@ -0,0 +1,26 @@
+package it.cavallium.dbengine.database;
+
+import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+
+public interface LLDatabaseConnection {
+
+ void connect() throws IOException;
+
+ LLKeyValueDatabase getDatabase(String name, List columns, boolean lowMemory) throws IOException;
+
+ LLLuceneIndex getLuceneIndex(String name,
+ int instancesCount,
+ TextFieldsAnalyzer textFieldsAnalyzer,
+ Duration queryRefreshDebounceTime,
+ Duration commitDebounceTime,
+ boolean lowMemory) throws IOException;
+
+ void disconnect() throws IOException;
+
+ void ping() throws IOException;
+
+ double getMediumLatencyMillis() throws IOException;
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java
new file mode 100644
index 0000000..3e25275
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLDeepDictionary.java
@@ -0,0 +1,69 @@
+package it.cavallium.dbengine.database;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.concurrency.atomicity.NotAtomic;
+import org.warp.commonutils.functional.TriConsumer;
+import org.warp.commonutils.functional.TriFunction;
+import org.warp.commonutils.type.Bytes;
+import org.warp.commonutils.type.UnmodifiableIterableMap;
+import org.warp.commonutils.type.UnmodifiableMap;
+
+@NotAtomic
+public interface LLDeepDictionary extends LLKeyValueDatabaseStructure {
+
+ UnmodifiableIterableMap get(@Nullable LLSnapshot snapshot, byte[] key1) throws IOException;
+
+ Optional get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException;
+
+
+ boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1);
+
+ boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException;
+
+ /**
+ * Note: this will remove previous elements because it replaces the entire map of key
+ */
+ void put(byte[] key1, UnmodifiableIterableMap value) throws IOException;
+
+ Optional put(byte[] key1, byte[] key2, byte[] value, LLDictionaryResultType resultType) throws IOException;
+
+
+ void putMulti(byte[][] keys1, UnmodifiableIterableMap[] values) throws IOException;
+
+ void putMulti(byte[] key1, byte[][] keys2, byte[][] values, LLDictionaryResultType resultType, Consumer responses) throws IOException;
+
+ void putMulti(byte[][] keys1, byte[][] keys2, byte[][] values, LLDictionaryResultType resultType, Consumer responses) throws IOException;
+
+
+ void clear() throws IOException;
+
+ Optional> clear(byte[] key1, LLDictionaryResultType resultType) throws IOException;
+
+ Optional remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException;
+
+
+ void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer consumer);
+
+ void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer> consumer);
+
+ void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key1, BiConsumer consumer);
+
+
+ void replaceAll(int parallelism, boolean replaceKeys, TriFunction> consumer) throws IOException;
+
+ void replaceAll(int parallelism, boolean replaceKeys, BiFunction, Entry>> consumer) throws IOException;
+
+ void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction> consumer) throws IOException;
+
+
+ long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException;
+
+ long exactSize(@Nullable LLSnapshot snapshot, byte[] key1);
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
new file mode 100644
index 0000000..4fc782e
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java
@@ -0,0 +1,44 @@
+package it.cavallium.dbengine.database;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.concurrency.atomicity.NotAtomic;
+
+@NotAtomic
+public interface LLDictionary extends LLKeyValueDatabaseStructure {
+
+ Optional get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException;
+
+ boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException;
+
+ Optional put(byte[] key, byte[] value, LLDictionaryResultType resultType)
+ throws IOException;
+
+ void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType,
+ Consumer responses) throws IOException;
+
+ Optional remove(byte[] key, LLDictionaryResultType resultType) throws IOException;
+
+ /**
+ * This method can call the consumer from different threads in parallel
+ */
+ void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer consumer);
+
+ /**
+ * This method can call the consumer from different threads in parallel
+ */
+ void replaceAll(int parallelism, boolean replaceKeys, BiFunction> consumer) throws IOException;
+
+ void clear() throws IOException;
+
+ long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException;
+
+ boolean isEmpty(@Nullable LLSnapshot snapshot) throws IOException;
+
+ Optional> removeOne() throws IOException;
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionaryResultType.java b/src/main/java/it/cavallium/dbengine/database/LLDictionaryResultType.java
new file mode 100644
index 0000000..317505c
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLDictionaryResultType.java
@@ -0,0 +1,36 @@
+package it.cavallium.dbengine.database;
+
+import org.jetbrains.annotations.Nullable;
+
+public enum LLDictionaryResultType {
+ VOID, VALUE_CHANGED, PREVIOUS_VALUE;
+
+ public static LLDictionaryResultType valueOf(@Nullable it.cavallium.dbengine.proto.LLDictionaryResultType resultType) {
+ if (resultType == null || resultType == it.cavallium.dbengine.proto.LLDictionaryResultType.UNRECOGNIZED) {
+ return VOID;
+ }
+
+ switch (resultType) {
+ case PREVIOUS_VALUE:
+ return PREVIOUS_VALUE;
+ case VALUE_CHANGED:
+ return VALUE_CHANGED;
+ case VOID:
+ return VOID;
+ }
+ return VOID;
+ }
+
+ public it.cavallium.dbengine.proto.LLDictionaryResultType toProto() {
+ switch (this) {
+ case VALUE_CHANGED:
+ return it.cavallium.dbengine.proto.LLDictionaryResultType.VALUE_CHANGED;
+ case PREVIOUS_VALUE:
+ return it.cavallium.dbengine.proto.LLDictionaryResultType.PREVIOUS_VALUE;
+ case VOID:
+ return it.cavallium.dbengine.proto.LLDictionaryResultType.VOID;
+ }
+
+ return it.cavallium.dbengine.proto.LLDictionaryResultType.UNRECOGNIZED;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLDocument.java b/src/main/java/it/cavallium/dbengine/database/LLDocument.java
new file mode 100644
index 0000000..3b9ce9a
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLDocument.java
@@ -0,0 +1,49 @@
+package it.cavallium.dbengine.database;
+
+import java.util.Arrays;
+import org.jetbrains.annotations.Nullable;
+
+public class LLDocument {
+
+ private final LLItem[] items;
+
+ public LLDocument(LLItem[] items) {
+ this.items = items;
+ }
+
+ public LLItem[] getItems() {
+ return items;
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(items);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLDocument that = (LLDocument) o;
+ return Arrays.equals(items, that.items);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(items);
+ }
+
+ @Nullable
+ public LLItem getField(String uid) {
+ for (LLItem item : items) {
+ if (item.getName().equals(uid)) {
+ return item;
+ }
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLItem.java b/src/main/java/it/cavallium/dbengine/database/LLItem.java
new file mode 100644
index 0000000..55f1c8c
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLItem.java
@@ -0,0 +1,144 @@
+package it.cavallium.dbengine.database;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.StringJoiner;
+import org.apache.lucene.document.Field;
+import org.jetbrains.annotations.Nullable;
+
+public class LLItem {
+
+ private final LLType type;
+ private final String name;
+ private final byte[] data;
+ @Nullable
+ private final byte[] data2;
+
+ public LLItem(LLType type, String name, byte[] data, @Nullable byte[] data2) {
+ this.type = type;
+ this.name = name;
+ this.data = data;
+ this.data2 = data2;
+ }
+
+ private LLItem(LLType type, String name, String data) {
+ this.type = type;
+ this.name = name;
+ this.data = data.getBytes(StandardCharsets.UTF_8);
+ this.data2 = null;
+ }
+
+ private LLItem(LLType type, String name, int data) {
+ this.type = type;
+ this.name = name;
+ this.data = Ints.toByteArray(data);
+ this.data2 = null;
+ }
+
+ private LLItem(LLType type, String name, float data) {
+ this.type = type;
+ this.name = name;
+ this.data = ByteBuffer.allocate(4).putFloat(data).array();;
+ this.data2 = null;
+ }
+
+ private LLItem(LLType type, String name, long data) {
+ this.type = type;
+ this.name = name;
+ this.data = Longs.toByteArray(data);
+ this.data2 = null;
+ }
+
+ public static LLItem newIntPoint(String name, int data) {
+ return new LLItem(LLType.IntPoint, name, data);
+ }
+
+ public static LLItem newLongPoint(String name, long data) {
+ return new LLItem(LLType.LongPoint, name, data);
+ }
+
+ public static LLItem newFloatPoint(String name, float data) {
+ return new LLItem(LLType.FloatPoint, name, data);
+ }
+
+ public static LLItem newTextField(String name, String data, Field.Store store) {
+ if (store == Field.Store.YES) {
+ return new LLItem(LLType.TextFieldStored, name, data);
+ } else {
+ return new LLItem(LLType.TextField, name, data);
+ }
+ }
+
+ public static LLItem newStringField(String name, String data, Field.Store store) {
+ if (store == Field.Store.YES) {
+ return new LLItem(LLType.StringFieldStored, name, data);
+ } else {
+ return new LLItem(LLType.StringField, name, data);
+ }
+ }
+
+ public static LLItem newSortedNumericDocValuesField(String name, long data) {
+ return new LLItem(LLType.SortedNumericDocValuesField, name, data);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public LLType getType() {
+ return type;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public byte[] getData2() {
+ return data2;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLItem llItem = (LLItem) o;
+ return type == llItem.type &&
+ Objects.equals(name, llItem.name) &&
+ Arrays.equals(data, llItem.data) &&
+ Arrays.equals(data2, llItem.data2);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(type, name);
+ result = 31 * result + Arrays.hashCode(data);
+ result = 31 * result + Arrays.hashCode(data2);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ var sj = new StringJoiner(", ", "[", "]")
+ .add("type=" + type)
+ .add("name='" + name + "'");
+ if (data != null && data.length > 0) {
+ sj.add("data=" + new String(data));
+ }
+ if (data2 != null && data2.length > 0) {
+ sj.add("data2=" + new String(data2));
+ }
+ return sj.toString();
+ }
+
+ public String stringValue() {
+ return new String(data, StandardCharsets.UTF_8);
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java
new file mode 100644
index 0000000..41bdc57
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLKeyScore.java
@@ -0,0 +1,48 @@
+package it.cavallium.dbengine.database;
+
+import java.util.Objects;
+
+public class LLKeyScore {
+
+ private final String key;
+ private final float score;
+
+ public LLKeyScore(String key, float score) {
+ this.key = key;
+ this.score = score;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public float getScore() {
+ return score;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLKeyScore that = (LLKeyScore) o;
+ return score == that.score &&
+ Objects.equals(key, that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, score);
+ }
+
+ @Override
+ public String toString() {
+ return "LLKeyScore{" +
+ "key=" + key +
+ ", score=" + score +
+ '}';
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java
new file mode 100644
index 0000000..8372ae3
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java
@@ -0,0 +1,65 @@
+package it.cavallium.dbengine.database;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import it.cavallium.dbengine.database.structures.LLDeepMap;
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import it.cavallium.dbengine.database.structures.LLFixedDeepSet;
+import it.cavallium.dbengine.database.structures.LLInt;
+import it.cavallium.dbengine.database.structures.LLLong;
+import it.cavallium.dbengine.database.structures.LLMap;
+import it.cavallium.dbengine.database.structures.LLSet;
+
+public interface LLKeyValueDatabase extends Closeable, LLSnapshottable, LLKeyValueDatabaseStructure {
+
+ LLSingleton getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue)
+ throws IOException;
+
+ LLDictionary getDictionary(byte[] columnName) throws IOException;
+
+ LLDeepDictionary getDeepDictionary(byte[] columnName, int keySize, int key2Size) throws IOException;
+
+ default LLSet getSet(String name) throws IOException {
+ LLDictionary dictionary = getDictionary(
+ Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII));
+ return new LLSet(dictionary);
+ }
+
+ default LLMap getMap(String name) throws IOException {
+ LLDictionary dictionary = getDictionary(
+ Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII));
+ return new LLMap(dictionary);
+ }
+
+ default LLFixedDeepSet getDeepSet(String name, int keySize, int key2Size) throws IOException {
+ LLDeepDictionary deepDictionary = getDeepDictionary(
+ Column.fixedSet(name).getName().getBytes(StandardCharsets.US_ASCII), keySize, key2Size);
+ return new LLFixedDeepSet(deepDictionary);
+ }
+
+ default LLDeepMap getDeepMap(String name, int keySize, int key2Size) throws IOException {
+ LLDeepDictionary deepDictionary = getDeepDictionary(
+ Column.hashMap(name).getName().getBytes(StandardCharsets.US_ASCII), keySize, key2Size);
+ return new LLDeepMap(deepDictionary);
+ }
+
+ default LLInt getInteger(String singletonListName, String name, int defaultValue)
+ throws IOException {
+ LLSingleton singleton = getSingleton(
+ Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII),
+ name.getBytes(StandardCharsets.US_ASCII), Ints.toByteArray(defaultValue));
+ return new LLInt(singleton);
+ }
+
+ default LLLong getLong(String singletonListName, String name, long defaultValue)
+ throws IOException {
+ LLSingleton singleton = getSingleton(
+ Column.special(singletonListName).getName().getBytes(StandardCharsets.US_ASCII),
+ name.getBytes(StandardCharsets.US_ASCII), Longs.toByteArray(defaultValue));
+ return new LLLong(singleton);
+ }
+
+ long getProperty(String propertyName) throws IOException;
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabaseStructure.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabaseStructure.java
new file mode 100644
index 0000000..fe9470d
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabaseStructure.java
@@ -0,0 +1,6 @@
+package it.cavallium.dbengine.database;
+
+public interface LLKeyValueDatabaseStructure {
+
+ String getDatabaseName();
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
new file mode 100644
index 0000000..9078b84
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLLuceneIndex.java
@@ -0,0 +1,55 @@
+package it.cavallium.dbengine.database;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import org.jetbrains.annotations.Nullable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+public interface LLLuceneIndex extends Closeable, LLSnapshottable {
+
+ String getLuceneIndexName();
+
+ void addDocument(LLTerm id, LLDocument doc) throws IOException;
+
+ void addDocuments(Iterable keys, Iterable documents) throws IOException;
+
+ void deleteDocument(LLTerm id) throws IOException;
+
+ void updateDocument(LLTerm id, LLDocument document) throws IOException;
+
+ void updateDocuments(Iterable ids, Iterable documents) throws IOException;
+
+ void deleteAll() throws IOException;
+
+ Collection search(@Nullable LLSnapshot snapshot, String query, int limit, @Nullable LLSort sort, String keyFieldName)
+ throws IOException;
+
+ Collection moreLikeThis(@Nullable LLSnapshot snapshot,
+ Map> mltDocumentFields,
+ int limit,
+ String keyFieldName) throws IOException;
+
+ /**
+ *
+ * @param snapshot
+ * @param query
+ * @param limit
+ * @param sort
+ * @param keyFieldName
+ * @return the collection has one or more flux
+ */
+ Tuple2, Collection>> searchStream(@Nullable LLSnapshot snapshot,
+ String query,
+ int limit,
+ @Nullable LLSort sort,
+ String keyFieldName);
+
+ long count(@Nullable LLSnapshot snapshot, String query) throws IOException;
+
+ boolean isLowMemoryMode();
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSingleton.java b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java
new file mode 100644
index 0000000..60a7b7d
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLSingleton.java
@@ -0,0 +1,11 @@
+package it.cavallium.dbengine.database;
+
+import java.io.IOException;
+import org.jetbrains.annotations.Nullable;
+
+public interface LLSingleton extends LLKeyValueDatabaseStructure {
+
+ byte[] get(@Nullable LLSnapshot snapshot) throws IOException;
+
+ void set(byte[] value) throws IOException;
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSnapshot.java b/src/main/java/it/cavallium/dbengine/database/LLSnapshot.java
new file mode 100644
index 0000000..f15d37d
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLSnapshot.java
@@ -0,0 +1,41 @@
+package it.cavallium.dbengine.database;
+
+import java.util.StringJoiner;
+
+public class LLSnapshot {
+ private final long sequenceNumber;
+
+ public LLSnapshot(long sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public long getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", LLSnapshot.class.getSimpleName() + "[", "]")
+ .add("sequenceNumber=" + sequenceNumber)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ LLSnapshot that = (LLSnapshot) o;
+
+ return sequenceNumber == that.sequenceNumber;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int) (sequenceNumber ^ (sequenceNumber >>> 32));
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSnapshottable.java b/src/main/java/it/cavallium/dbengine/database/LLSnapshottable.java
new file mode 100644
index 0000000..1e5df8a
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLSnapshottable.java
@@ -0,0 +1,10 @@
+package it.cavallium.dbengine.database;
+
+import java.io.IOException;
+
+public interface LLSnapshottable {
+
+ LLSnapshot takeSnapshot() throws IOException;
+
+ void releaseSnapshot(LLSnapshot snapshot) throws IOException;
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSort.java b/src/main/java/it/cavallium/dbengine/database/LLSort.java
new file mode 100644
index 0000000..0fbd1a0
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLSort.java
@@ -0,0 +1,64 @@
+package it.cavallium.dbengine.database;
+
+import java.util.Objects;
+
+public class LLSort {
+
+ private final String fieldName;
+ private final LLSortType type;
+ private final boolean reverse;
+
+ public LLSort(String fieldName, LLSortType type, boolean reverse) {
+ this.fieldName = fieldName;
+ this.type = type;
+ this.reverse = reverse;
+ }
+
+ public static LLSort newSortedNumericSortField(String fieldName, boolean reverse) {
+ return new LLSort(fieldName, LLSortType.LONG, reverse);
+ }
+
+ public static LLSort newRandomSortField() {
+ return new LLSort(null, LLSortType.RANDOM, false);
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public LLSortType getType() {
+ return type;
+ }
+
+ public boolean isReverse() {
+ return reverse;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLSort llSort = (LLSort) o;
+ return reverse == llSort.reverse &&
+ Objects.equals(fieldName, llSort.fieldName) &&
+ type == llSort.type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fieldName, type, reverse);
+ }
+
+ @Override
+ public String toString() {
+ return "LLSort{" +
+ "fieldName='" + fieldName + '\'' +
+ ", type=" + type +
+ ", reverse=" + reverse +
+ '}';
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLSortType.java b/src/main/java/it/cavallium/dbengine/database/LLSortType.java
new file mode 100644
index 0000000..43f5f87
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLSortType.java
@@ -0,0 +1,6 @@
+package it.cavallium.dbengine.database;
+
+public enum LLSortType {
+ LONG,
+ RANDOM
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLTerm.java b/src/main/java/it/cavallium/dbengine/database/LLTerm.java
new file mode 100644
index 0000000..c529a52
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLTerm.java
@@ -0,0 +1,48 @@
+package it.cavallium.dbengine.database;
+
+import java.util.Objects;
+
+public class LLTerm {
+
+ private final String key;
+ private final String value;
+
+ public LLTerm(String key, String value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "LLTerm{" +
+ "key='" + key + '\'' +
+ ", value='" + value + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLTerm llTerm = (LLTerm) o;
+ return Objects.equals(key, llTerm.key) &&
+ Objects.equals(value, llTerm.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java b/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java
new file mode 100644
index 0000000..6c926e6
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLTopKeys.java
@@ -0,0 +1,51 @@
+package it.cavallium.dbengine.database;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+public class LLTopKeys {
+
+ private final long totalHitsCount;
+ private final LLKeyScore[] hits;
+
+ public LLTopKeys(long totalHitsCount, LLKeyScore[] hits) {
+ this.totalHitsCount = totalHitsCount;
+ this.hits = hits;
+ }
+
+ public long getTotalHitsCount() {
+ return totalHitsCount;
+ }
+
+ public LLKeyScore[] getHits() {
+ return hits;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LLTopKeys llTopKeys = (LLTopKeys) o;
+ return totalHitsCount == llTopKeys.totalHitsCount &&
+ Arrays.equals(hits, llTopKeys.hits);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(totalHitsCount);
+ result = 31 * result + Arrays.hashCode(hits);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "LLTopKeys{" +
+ "totalHitsCount=" + totalHitsCount +
+ ", hits=" + Arrays.toString(hits) +
+ '}';
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLType.java b/src/main/java/it/cavallium/dbengine/database/LLType.java
new file mode 100644
index 0000000..67b248a
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLType.java
@@ -0,0 +1,12 @@
+package it.cavallium.dbengine.database;
+
+public enum LLType {
+ StringField,
+ StringFieldStored,
+ IntPoint,
+ LongPoint,
+ FloatPoint,
+ SortedNumericDocValuesField,
+ TextField,
+ TextFieldStored
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
new file mode 100644
index 0000000..787b519
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java
@@ -0,0 +1,200 @@
+package it.cavallium.dbengine.database;
+
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+import com.google.protobuf.ByteString;
+import it.cavallium.dbengine.database.utils.RandomSortField;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.FloatPoint;
+import org.apache.lucene.document.IntPoint;
+import org.apache.lucene.document.LongPoint;
+import org.apache.lucene.document.SortedNumericDocValuesField;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.SortField;
+import org.apache.lucene.search.SortedNumericSortField;
+import org.jetbrains.annotations.Nullable;
+import it.cavallium.dbengine.proto.LLKeyScore;
+import it.cavallium.dbengine.proto.LLType;
+
+public class LLUtils {
+
+ private static final byte[] RESPONSE_TRUE = new byte[]{1};
+ private static final byte[] RESPONSE_FALSE = new byte[]{0};
+
+ public static boolean responseToBoolean(byte[] response) {
+ return response[0] == 1;
+ }
+
+ public static byte[] booleanToResponse(boolean bool) {
+ return bool ? RESPONSE_TRUE : RESPONSE_FALSE;
+ }
+
+ @Nullable
+ public static Sort toSort(@Nullable LLSort sort) {
+ if (sort == null) {
+ return null;
+ }
+ if (sort.getType() == LLSortType.LONG) {
+ return new Sort(new SortedNumericSortField(sort.getFieldName(), SortField.Type.LONG, sort.isReverse()));
+ } else if (sort.getType() == LLSortType.RANDOM) {
+ return new Sort(new RandomSortField());
+ }
+ return null;
+ }
+
+ public static Term toTerm(LLTerm term) {
+ return new Term(term.getKey(), term.getValue());
+ }
+
+ public static Document toDocument(LLDocument document) {
+ Document d = new Document();
+ for (LLItem item : document.getItems()) {
+ d.add(LLUtils.toField(item));
+ }
+ return d;
+ }
+
+ public static Iterable toDocuments(Iterable document) {
+ List d = new LinkedList<>();
+ for (LLDocument doc : document) {
+ d.add(LLUtils.toDocument(doc));
+ }
+ return d;
+ }
+
+ public static Iterable toTerms(Iterable terms) {
+ List d = new LinkedList<>();
+ for (LLTerm term : terms) {
+ d.add(LLUtils.toTerm(term));
+ }
+ return d;
+ }
+
+ private static IndexableField toField(LLItem item) {
+ switch (item.getType()) {
+ case IntPoint:
+ return new IntPoint(item.getName(), Ints.fromByteArray(item.getData()));
+ case LongPoint:
+ return new LongPoint(item.getName(), Longs.fromByteArray(item.getData()));
+ case FloatPoint:
+ return new FloatPoint(item.getName(), ByteBuffer.wrap(item.getData()).getFloat());
+ case TextField:
+ return new TextField(item.getName(), item.stringValue(), Field.Store.NO);
+ case TextFieldStored:
+ return new TextField(item.getName(), item.stringValue(), Field.Store.YES);
+ case SortedNumericDocValuesField:
+ return new SortedNumericDocValuesField(item.getName(), Longs.fromByteArray(item.getData()));
+ case StringField:
+ return new StringField(item.getName(), item.stringValue(), Field.Store.NO);
+ case StringFieldStored:
+ return new StringField(item.getName(), item.stringValue(), Field.Store.YES);
+ }
+ throw new UnsupportedOperationException("Unsupported field type");
+ }
+
+ public static Iterable extends it.cavallium.dbengine.proto.LLItem> toGrpc(LLItem[] items) {
+ var list = new ArrayList(items.length);
+ for (LLItem item : items) {
+ list.add(LLUtils.toGrpc(item));
+ }
+ return list;
+ }
+
+ public static it.cavallium.dbengine.proto.LLItem toGrpc(LLItem item) {
+ var builder = it.cavallium.dbengine.proto.LLItem.newBuilder()
+ .setType(LLType.valueOf(item.getType().toString()))
+ .setName(item.getName())
+ .setData1(ByteString.copyFrom(item.getData()));
+ if (item.getData2() != null) {
+ builder.setData2(ByteString.copyFrom(item.getData2()));
+ }
+ return builder.build();
+ }
+
+ public static it.cavallium.dbengine.proto.LLDocument toGrpc(LLDocument doc) {
+ var builder = it.cavallium.dbengine.proto.LLDocument.newBuilder()
+ .addAllItems(toGrpc(doc.getItems()));
+ return builder.build();
+ }
+
+ public static Iterable toGrpc(
+ Iterable document) {
+ LinkedList docs = new LinkedList<>();
+ document.forEach((doc) -> docs.add(toGrpc(doc)));
+ return docs;
+ }
+
+ public static Iterable toGrpcKey(Iterable term) {
+ LinkedList terms = new LinkedList<>();
+ term.forEach((t) -> terms.add(toGrpc(t)));
+ return terms;
+ }
+
+ public static it.cavallium.dbengine.proto.LLTerm toGrpc(LLTerm term) {
+ return it.cavallium.dbengine.proto.LLTerm.newBuilder()
+ .setKey(term.getKey())
+ .setValue(term.getValue())
+ .build();
+ }
+
+ public static it.cavallium.dbengine.proto.LLSort toGrpc(LLSort sort) {
+ return it.cavallium.dbengine.proto.LLSort.newBuilder()
+ .setFieldName(sort.getFieldName())
+ .setType(it.cavallium.dbengine.proto.LLSortType.valueOf(sort.getType().toString()))
+ .setReverse(sort.isReverse())
+ .build();
+ }
+
+ public static it.cavallium.dbengine.database.LLKeyScore toKeyScore(LLKeyScore hit) {
+ return new it.cavallium.dbengine.database.LLKeyScore(hit.getKey(), hit.getScore());
+ }
+
+ public static LLDocument toLocal(List documentItemsList) {
+ return new LLDocument(documentItemsList.stream().map(LLUtils::toLocal).toArray(LLItem[]::new));
+ }
+
+ public static LLDocument toLocal(it.cavallium.dbengine.proto.LLDocument document) {
+ return toLocal(document.getItemsList());
+ }
+
+ public static List toLocalDocuments(
+ List documentItemsList) {
+ return documentItemsList.stream().map(LLUtils::toLocal).collect(Collectors.toList());
+ }
+
+ public static List toLocalTerms(List termItemsList) {
+ return termItemsList.stream().map(LLUtils::toLocal).collect(Collectors.toList());
+ }
+
+ private static LLItem toLocal(it.cavallium.dbengine.proto.LLItem item) {
+ var data2 = item.getData2() != null ? item.getData2().toByteArray() : null;
+ return new LLItem(it.cavallium.dbengine.database.LLType.valueOf(item.getType().toString()),
+ item.getName(), item.getData1().toByteArray(), data2);
+ }
+
+ public static LLTerm toLocal(it.cavallium.dbengine.proto.LLTerm key) {
+ return new LLTerm(key.getKey(), key.getValue());
+ }
+
+ public static LLSort toLocal(it.cavallium.dbengine.proto.LLSort sort) {
+ return new LLSort(sort.getFieldName(), LLSortType.valueOf(sort.getType().toString()),
+ sort.getReverse());
+ }
+
+ public static LLKeyScore toGrpc(it.cavallium.dbengine.database.LLKeyScore hit) {
+ return LLKeyScore.newBuilder()
+ .setKey(hit.getKey())
+ .setScore(hit.getScore())
+ .build();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/LuceneUtils.java b/src/main/java/it/cavallium/dbengine/database/LuceneUtils.java
new file mode 100644
index 0000000..4929b35
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/LuceneUtils.java
@@ -0,0 +1,60 @@
+package it.cavallium.dbengine.database;
+
+import it.cavallium.dbengine.database.analyzer.N4CharGramAnalyzer;
+import it.cavallium.dbengine.database.analyzer.N4CharGramEdgeAnalyzer;
+import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
+import it.cavallium.dbengine.database.analyzer.WordAnalyzer;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.LowerCaseFilter;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.en.EnglishPossessiveFilter;
+import org.apache.lucene.analysis.en.KStemFilter;
+import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
+
+public class LuceneUtils {
+ private static final Analyzer lucene4CharGramAnalyzerEdgeInstance = new N4CharGramEdgeAnalyzer();
+ private static final Analyzer lucene4CharGramAnalyzerInstance = new N4CharGramAnalyzer();
+ private static final Analyzer luceneWordAnalyzerStopWordsAndStemInstance = new WordAnalyzer(true, true);
+ private static final Analyzer luceneWordAnalyzerStopWordsInstance = new WordAnalyzer(true, false);
+ private static final Analyzer luceneWordAnalyzerStemInstance = new WordAnalyzer(false, true);
+ private static final Analyzer luceneWordAnalyzerSimpleInstance = new WordAnalyzer(false, false);
+
+ public static Analyzer getAnalyzer(TextFieldsAnalyzer analyzer) {
+ switch (analyzer) {
+ case PartialWordsEdge:
+ return lucene4CharGramAnalyzerEdgeInstance;
+ case PartialWords:
+ return lucene4CharGramAnalyzerInstance;
+ case FullText:
+ return luceneWordAnalyzerStopWordsAndStemInstance;
+ case WordWithStopwordsStripping:
+ return luceneWordAnalyzerStopWordsInstance;
+ case WordWithStemming:
+ return luceneWordAnalyzerStemInstance;
+ case WordSimple:
+ return luceneWordAnalyzerSimpleInstance;
+ default:
+ throw new UnsupportedOperationException("Unknown analyzer: " + analyzer);
+ }
+ }
+
+ /**
+ *
+ * @param stem Enable stem filters on words.
+ * Pass false if it will be used with a n-gram filter
+ */
+ public static TokenStream newCommonFilter(TokenStream tokenStream, boolean stem) {
+ tokenStream = newCommonNormalizer(tokenStream);
+ if (stem) {
+ tokenStream = new KStemFilter(tokenStream);
+ tokenStream = new EnglishPossessiveFilter(tokenStream);
+ }
+ return tokenStream;
+ }
+
+ public static TokenStream newCommonNormalizer(TokenStream tokenStream) {
+ tokenStream = new ASCIIFoldingFilter(tokenStream);
+ tokenStream = new LowerCaseFilter(tokenStream);
+ return tokenStream;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/analyzer/N4CharGramAnalyzer.java b/src/main/java/it/cavallium/dbengine/database/analyzer/N4CharGramAnalyzer.java
new file mode 100644
index 0000000..017a308
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/analyzer/N4CharGramAnalyzer.java
@@ -0,0 +1,32 @@
+package it.cavallium.dbengine.database.analyzer;
+
+import it.cavallium.dbengine.database.LuceneUtils;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.core.KeywordTokenizer;
+import org.apache.lucene.analysis.ngram.NGramTokenFilter;
+
+public class N4CharGramAnalyzer extends Analyzer {
+
+ public N4CharGramAnalyzer() {
+
+ }
+
+ @Override
+ protected TokenStreamComponents createComponents(final String fieldName) {
+ Tokenizer tokenizer = new KeywordTokenizer();
+ TokenStream tokenStream = tokenizer;
+ tokenStream = LuceneUtils.newCommonFilter(tokenStream, false);
+ tokenStream = new NGramTokenFilter(tokenStream, 4, 4, false);
+
+ return new TokenStreamComponents(tokenizer, tokenStream);
+ }
+
+ @Override
+ protected TokenStream normalize(String fieldName, TokenStream in) {
+ TokenStream tokenStream = in;
+ tokenStream = LuceneUtils.newCommonNormalizer(tokenStream);
+ return tokenStream;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/analyzer/N4CharGramEdgeAnalyzer.java b/src/main/java/it/cavallium/dbengine/database/analyzer/N4CharGramEdgeAnalyzer.java
new file mode 100644
index 0000000..2e60676
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/analyzer/N4CharGramEdgeAnalyzer.java
@@ -0,0 +1,32 @@
+package it.cavallium.dbengine.database.analyzer;
+
+import it.cavallium.dbengine.database.LuceneUtils;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.core.KeywordTokenizer;
+import org.apache.lucene.analysis.ngram.EdgeNGramTokenFilter;
+
+public class N4CharGramEdgeAnalyzer extends Analyzer {
+
+ public N4CharGramEdgeAnalyzer() {
+
+ }
+
+ @Override
+ protected TokenStreamComponents createComponents(final String fieldName) {
+ Tokenizer tokenizer = new KeywordTokenizer();
+ TokenStream tokenStream = tokenizer;
+ tokenStream = LuceneUtils.newCommonFilter(tokenStream, false);
+ tokenStream = new EdgeNGramTokenFilter(tokenStream, 4, 4, false);
+
+ return new TokenStreamComponents(tokenizer, tokenStream);
+ }
+
+ @Override
+ protected TokenStream normalize(String fieldName, TokenStream in) {
+ TokenStream tokenStream = in;
+ tokenStream = LuceneUtils.newCommonNormalizer(tokenStream);
+ return tokenStream;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/analyzer/TextFieldsAnalyzer.java b/src/main/java/it/cavallium/dbengine/database/analyzer/TextFieldsAnalyzer.java
new file mode 100644
index 0000000..ee24de9
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/analyzer/TextFieldsAnalyzer.java
@@ -0,0 +1,10 @@
+package it.cavallium.dbengine.database.analyzer;
+
+public enum TextFieldsAnalyzer {
+ PartialWordsEdge,
+ PartialWords,
+ WordSimple,
+ WordWithStopwordsStripping,
+ WordWithStemming,
+ FullText,
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/analyzer/WordAnalyzer.java b/src/main/java/it/cavallium/dbengine/database/analyzer/WordAnalyzer.java
new file mode 100644
index 0000000..aaa9af0
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/analyzer/WordAnalyzer.java
@@ -0,0 +1,39 @@
+package it.cavallium.dbengine.database.analyzer;
+
+import it.cavallium.dbengine.database.EnglishItalianStopFilter;
+import it.cavallium.dbengine.database.LuceneUtils;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.standard.StandardTokenizer;
+
+public class WordAnalyzer extends Analyzer {
+
+ private final boolean removeStopWords;
+ private final boolean stem;
+
+ public WordAnalyzer(boolean removeStopWords, boolean stem) {
+ this.removeStopWords = removeStopWords;
+ this.stem = stem;
+ }
+
+ @Override
+ protected TokenStreamComponents createComponents(final String fieldName) {
+ Tokenizer tokenizer = new StandardTokenizer();
+ TokenStream tokenStream = tokenizer;
+ //tokenStream = new LengthFilter(tokenStream, 1, 100);
+ if (removeStopWords) {
+ tokenStream = new EnglishItalianStopFilter(tokenStream);
+ }
+ tokenStream = LuceneUtils.newCommonFilter(tokenStream, stem);
+
+ return new TokenStreamComponents(tokenizer, tokenStream);
+ }
+
+ @Override
+ protected TokenStream normalize(String fieldName, TokenStream in) {
+ TokenStream tokenStream = in;
+ tokenStream = LuceneUtils.newCommonNormalizer(tokenStream);
+ return tokenStream;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java
new file mode 100644
index 0000000..bebc6ae
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/CappedWriteBatch.java
@@ -0,0 +1,186 @@
+package it.cavallium.dbengine.database.disk;
+
+import java.nio.ByteBuffer;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteBatchInterface;
+import org.rocksdb.WriteOptions;
+import org.warp.commonutils.concurrency.atomicity.NotAtomic;
+
+@NotAtomic
+public class CappedWriteBatch implements WriteBatchInterface, AutoCloseable {
+
+ private final RocksDB db;
+ private final int cap;
+ private final WriteOptions writeOptions;
+
+ private final WriteBatch writeBatch;
+
+ /**
+ *
+ * @param cap The limit of operations
+ */
+ public CappedWriteBatch(RocksDB db, int cap, int reservedWriteBatchSize, long maxWriteBatchSize, WriteOptions writeOptions) {
+ this.db = db;
+ this.cap = cap;
+ this.writeOptions = writeOptions;
+ this.writeBatch = new WriteBatch(reservedWriteBatchSize);
+ this.writeBatch.setMaxBytes(maxWriteBatchSize);
+ }
+
+ private void flushIfNeeded(boolean force) throws RocksDBException {
+ if (this.writeBatch.count() >= (force ? 1 : cap)) {
+ db.write(writeOptions, this.writeBatch);
+ this.writeBatch.clear();
+ }
+ }
+
+ @Override
+ public int count() {
+ return writeBatch.count();
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws RocksDBException {
+ writeBatch.put(key, value);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
+ writeBatch.put(columnFamilyHandle, key, value);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
+ writeBatch.put(key, value);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException {
+ writeBatch.put(columnFamilyHandle, key, value);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void merge(byte[] key, byte[] value) throws RocksDBException {
+ writeBatch.merge(key, value);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
+ writeBatch.merge(columnFamilyHandle, key, value);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void remove(byte[] key) throws RocksDBException {
+ writeBatch.remove(key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
+ writeBatch.remove(columnFamilyHandle, key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void delete(byte[] key) throws RocksDBException {
+ writeBatch.delete(key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
+ writeBatch.delete(columnFamilyHandle, key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void singleDelete(byte[] key) throws RocksDBException {
+ writeBatch.singleDelete(key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void singleDelete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
+ writeBatch.singleDelete(columnFamilyHandle, key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void remove(ByteBuffer key) throws RocksDBException {
+ writeBatch.remove(key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void remove(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException {
+ writeBatch.remove(columnFamilyHandle, key);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDBException {
+ writeBatch.deleteRange(beginKey, endKey);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey)
+ throws RocksDBException {
+ writeBatch.deleteRange(columnFamilyHandle, beginKey, endKey);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void putLogData(byte[] blob) throws RocksDBException {
+ writeBatch.putLogData(blob);
+ flushIfNeeded(false);
+ }
+
+ @Override
+ public void clear() {
+ writeBatch.clear();
+ }
+
+ @Override
+ public void setSavePoint() {
+ writeBatch.setSavePoint();
+ }
+
+ @Override
+ public void rollbackToSavePoint() throws RocksDBException {
+ writeBatch.rollbackToSavePoint();
+ }
+
+ @Override
+ public void popSavePoint() throws RocksDBException {
+ writeBatch.popSavePoint();
+ }
+
+ @Override
+ public void setMaxBytes(long maxBytes) {
+ writeBatch.setMaxBytes(maxBytes);
+ }
+
+ @Override
+ public WriteBatch getWriteBatch() {
+ return writeBatch;
+ }
+
+ public void writeToDbAndClose() throws RocksDBException {
+ flushIfNeeded(true);
+ }
+
+ @Override
+ public void close() {
+ writeBatch.close();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
new file mode 100644
index 0000000..f2d42a4
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDatabaseConnection.java
@@ -0,0 +1,78 @@
+package it.cavallium.dbengine.database.disk;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+import it.cavallium.dbengine.database.Column;
+import it.cavallium.dbengine.database.LLDatabaseConnection;
+import it.cavallium.dbengine.database.LLLuceneIndex;
+import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
+
+public class LLLocalDatabaseConnection implements LLDatabaseConnection {
+
+ private final Path basePath;
+ private final boolean crashIfWalError;
+
+ public LLLocalDatabaseConnection(Path basePath, boolean crashIfWalError) {
+ this.basePath = basePath;
+ this.crashIfWalError = crashIfWalError;
+ }
+
+ @Override
+ public void connect() throws IOException {
+ if (Files.notExists(basePath)) {
+ Files.createDirectories(basePath);
+ }
+ }
+
+ @Override
+ public LLLocalKeyValueDatabase getDatabase(String name, List columns, boolean lowMemory) throws IOException {
+ return new LLLocalKeyValueDatabase(name, basePath.resolve("database_" + name), columns, new LinkedList<>(),
+ crashIfWalError, lowMemory);
+ }
+
+ @Override
+ public LLLuceneIndex getLuceneIndex(String name,
+ int instancesCount,
+ TextFieldsAnalyzer textFieldsAnalyzer,
+ Duration queryRefreshDebounceTime,
+ Duration commitDebounceTime,
+ boolean lowMemory) throws IOException {
+ if (instancesCount != 1) {
+ return new LLLocalMultiLuceneIndex(basePath.resolve("lucene"),
+ name,
+ instancesCount,
+ textFieldsAnalyzer,
+ queryRefreshDebounceTime,
+ commitDebounceTime,
+ lowMemory
+ );
+ } else {
+ return new LLLocalLuceneIndex(basePath.resolve("lucene"),
+ name,
+ textFieldsAnalyzer,
+ queryRefreshDebounceTime,
+ commitDebounceTime,
+ lowMemory
+ );
+ }
+ }
+
+ @Override
+ public void disconnect() throws IOException {
+
+ }
+
+ @Override
+ public void ping() {
+
+ }
+
+ @Override
+ public double getMediumLatencyMillis() {
+ return 0;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java
new file mode 100644
index 0000000..9280822
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDeepDictionary.java
@@ -0,0 +1,865 @@
+package it.cavallium.dbengine.database.disk;
+
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Holder;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.WriteBatchInterface;
+import org.warp.commonutils.concurrency.atomicity.NotAtomic;
+import org.warp.commonutils.error.IndexOutOfBoundsException;
+import org.warp.commonutils.functional.TriConsumer;
+import org.warp.commonutils.functional.TriFunction;
+import org.warp.commonutils.type.Bytes;
+import org.warp.commonutils.type.UnmodifiableIterableMap;
+import org.warp.commonutils.type.UnmodifiableMap;
+import it.cavallium.dbengine.database.LLDeepDictionary;
+import it.cavallium.dbengine.database.LLDictionaryResultType;
+import it.cavallium.dbengine.database.LLSnapshot;
+import it.cavallium.dbengine.database.LLUtils;
+
+@NotAtomic
+public class LLLocalDeepDictionary implements LLDeepDictionary {
+
+ private static final byte[] NO_DATA = new byte[0];
+ private static final byte[][] NO_DATA_MAP = new byte[0][0];
+ private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
+ private final RocksDB db;
+ private final ColumnFamilyHandle cfh;
+ private final String databaseName;
+ private final Function snapshotResolver;
+ private final int key1Size;
+ private final int key2Size;
+ private final int key1Position;
+ private final int key2Position;
+ private final int key1EndPosition;
+ private final int key2EndPosition;
+ private final int combinedKeySize;
+
+ public LLLocalDeepDictionary(@NotNull RocksDB db, @NotNull ColumnFamilyHandle columnFamilyHandle,
+ String databaseName,
+ Function snapshotResolver, int keySize, int key2Size) {
+ Objects.requireNonNull(db);
+ this.db = db;
+ Objects.requireNonNull(columnFamilyHandle);
+ this.cfh = columnFamilyHandle;
+ this.databaseName = databaseName;
+ this.snapshotResolver = snapshotResolver;
+ this.key1Size = keySize;
+ this.key2Size = key2Size;
+ this.key1Position = 0;
+ this.key2Position = key1Size;
+ this.key1EndPosition = key1Position + key1Size;
+ this.key2EndPosition = key2Position + key2Size;
+ this.combinedKeySize = keySize + key2Size;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+ private boolean isSubKey(byte[] key1, byte[] combinedKey) {
+ if (key1 == null || combinedKey == null || key1.length != key1Size || combinedKey.length != combinedKeySize) {
+ return false;
+ }
+
+ return Arrays.equals(key1, 0, key1Size, combinedKey, key1Position, key1EndPosition);
+ }
+
+ private byte[] getStartSeekKey(byte[] key1) {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ return Arrays.copyOf(key1, combinedKeySize);
+ }
+
+ private byte[] getEndSeekKey(byte[] key1) {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ byte[] endSeekKey = Arrays.copyOf(key1, combinedKeySize);
+ Arrays.fill(endSeekKey, key2Position, key2EndPosition, (byte) 0xFF);
+ return endSeekKey;
+ }
+
+ @NotNull
+ private byte[] getKey1(@NotNull byte[] combinedKey) {
+ if (combinedKey.length != combinedKeySize) {
+ throw new IndexOutOfBoundsException(combinedKey.length, combinedKeySize, combinedKeySize);
+ }
+ return Arrays.copyOfRange(combinedKey, key1Position, key1EndPosition);
+ }
+
+ @NotNull
+ private byte[] getKey2(@NotNull byte[] combinedKey) {
+ return Arrays.copyOfRange(combinedKey, key2Position, key2EndPosition);
+ }
+
+ @NotNull
+ private byte[] getCombinedKey(@NotNull byte[] key1, @NotNull byte[] key2) {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ if (key2.length != key2Size) {
+ throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
+ }
+ var combinedKey = new byte[combinedKeySize];
+ System.arraycopy(key1, 0, combinedKey, key1Position, key1Size);
+ System.arraycopy(key2, 0, combinedKey, key2Position, key2Size);
+ return combinedKey;
+ }
+
+ private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
+ if (snapshot != null) {
+ return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot));
+ } else {
+ return EMPTY_READ_OPTIONS;
+ }
+ }
+
+ @Override
+ public UnmodifiableIterableMap get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
+ if (key.length != key1Size) {
+ throw new IndexOutOfBoundsException(key.length, key1Size, key1Size);
+ }
+ ObjectArrayList keys = new ObjectArrayList<>();
+ ObjectArrayList values = new ObjectArrayList<>();
+ try (var iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
+ iterator.seek(key);
+ while (iterator.isValid()) {
+
+ byte[] combinedKey = iterator.key();
+
+ if (!isSubKey(key, combinedKey)) {
+ break;
+ }
+
+ byte[] key2 = getKey2(combinedKey);
+ byte[] value = iterator.value();
+ keys.add(key2);
+ values.add(value);
+
+ iterator.next();
+ }
+ }
+
+ return UnmodifiableIterableMap.of(keys.toArray(byte[][]::new), values.toArray(byte[][]::new));
+ }
+
+ @Override
+ public Optional get(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ if (key2.length != key2Size) {
+ throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
+ }
+ try {
+ Holder data = new Holder<>();
+ byte[] combinedKey = getCombinedKey(key1, key2);
+ if (db.keyMayExist(cfh, resolveSnapshot(snapshot), combinedKey, data)) {
+ if (data.getValue() != null) {
+ return Optional.of(data.getValue());
+ } else {
+ byte[] value = db.get(cfh, resolveSnapshot(snapshot), combinedKey);
+ return Optional.ofNullable(value);
+ }
+ } else {
+ return Optional.empty();
+ }
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean isEmpty(@Nullable LLSnapshot snapshot, byte[] key1) {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ byte[] startSeekKey = getStartSeekKey(key1);
+ try (var iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
+ iterator.seek(startSeekKey);
+ if (!iterator.isValid()) {
+ return true;
+ }
+ byte[] startKey = iterator.key();
+ return !isSubKey(key1, startKey);
+ }
+ }
+
+ @Override
+ public boolean contains(@Nullable LLSnapshot snapshot, byte[] key1, byte[] key2) throws IOException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ if (key2.length != key2Size) {
+ throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
+ }
+ try {
+ var combinedKey = getCombinedKey(key1, key2);
+ int size = RocksDB.NOT_FOUND;
+ Holder data = new Holder<>();
+ if (db.keyMayExist(cfh, resolveSnapshot(snapshot), combinedKey, data)) {
+ if (data.getValue() != null) {
+ size = data.getValue().length;
+ } else {
+ size = db.get(cfh, resolveSnapshot(snapshot), combinedKey, NO_DATA);
+ }
+ }
+ return size != RocksDB.NOT_FOUND;
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ //todo: use WriteBatch to enhance performance
+ @Override
+ public void put(byte[] key1, UnmodifiableIterableMap value) throws IOException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ try {
+ var bytesValue = Bytes.ofMap(value);
+ var alreadyEditedKeys = new ObjectOpenHashSet();
+
+ // Delete old keys and change keys that are already present
+ try (var iterator = db.newIterator(cfh)) {
+ iterator.seek(getStartSeekKey(key1));
+ while (iterator.isValid()) {
+ byte[] combinedKey = iterator.key();
+
+ if (!isSubKey(key1, combinedKey)) {
+ // The key is outside of key1: exit from the iteration
+ break;
+ }
+
+ byte[] key2 = getKey2(combinedKey);
+ var valueToSetHere = bytesValue.get(key2);
+ if (valueToSetHere == null) {
+ // key not present in the new data: remove it from the database
+ db.delete(cfh, combinedKey);
+ } else {
+ // key present in the new data: replace it on the database
+ alreadyEditedKeys.add(new Bytes(key2));
+ db.put(cfh, combinedKey, valueToSetHere.data);
+ }
+
+ iterator.next();
+ }
+ }
+
+ // Add new keys, avoiding to add already changed keys
+ var mapIterator = bytesValue.fastIterator();
+ while (mapIterator.hasNext()) {
+ var mapEntry = mapIterator.next();
+ var key2 = mapEntry.getKey();
+ if (key2.data.length != key2Size) {
+ throw new IndexOutOfBoundsException(key2.data.length, key2Size, key2Size);
+ }
+
+ if (!alreadyEditedKeys.contains(key2)) {
+ var value2 = mapEntry.getValue();
+ db.put(cfh, getCombinedKey(key1, key2.data), value2.data);
+ }
+ }
+ } catch (RocksDBException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ //todo: use WriteBatch to enhance performance
+ @Override
+ public void putMulti(byte[][] keys1, UnmodifiableIterableMap[] values) throws IOException {
+ if (keys1.length == values.length) {
+ for (int i = 0; i < keys1.length; i++) {
+ put(keys1[i], values[i]);
+ }
+ } else {
+ throw new IOException("Wrong parameters count");
+ }
+ }
+
+ @Override
+ public Optional put(byte[] key1, byte[] key2, byte[] value, LLDictionaryResultType resultType)
+ throws IOException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ if (key2.length != key2Size) {
+ throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
+ }
+ try {
+ byte[] response = null;
+ var combinedKey = getCombinedKey(key1, key2);
+ switch (resultType) {
+ case VALUE_CHANGED:
+ response = LLUtils.booleanToResponse(!this.contains(null, key1, key2));
+ break;
+ case PREVIOUS_VALUE:
+ var data = new Holder();
+ if (db.keyMayExist(cfh, combinedKey, data)) {
+ if (data.getValue() != null) {
+ response = data.getValue();
+ } else {
+ response = db.get(cfh, combinedKey);
+ }
+ } else {
+ response = null;
+ }
+ break;
+ }
+ db.put(cfh, combinedKey, value);
+ return Optional.ofNullable(response);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ //todo: use WriteBatch to enhance performance
+ @Override
+ public void putMulti(byte[] key1,
+ byte[][] keys2,
+ byte[][] values2,
+ LLDictionaryResultType resultType,
+ Consumer responses) throws IOException {
+ if (keys2.length == values2.length) {
+ for (int i = 0; i < keys2.length; i++) {
+ var result = put(key1, keys2[i], values2[i], resultType);
+ if (resultType != LLDictionaryResultType.VOID) {
+ responses.accept(result.orElse(NO_DATA));
+ }
+ }
+ } else {
+ throw new IOException("Wrong parameters count");
+ }
+ }
+
+ //todo: use WriteBatch to enhance performance
+ @Override
+ public void putMulti(byte[][] keys1,
+ byte[][] keys2,
+ byte[][] values2,
+ LLDictionaryResultType resultType,
+ Consumer responses) throws IOException {
+ if (keys1.length == keys2.length && keys2.length == values2.length) {
+ for (int i = 0; i < keys1.length; i++) {
+ var result = put(keys1[i], keys2[i], values2[i], resultType);
+ if (resultType != LLDictionaryResultType.VOID) {
+ responses.accept(result.orElse(NO_DATA));
+ }
+ }
+ } else {
+ throw new IOException("Wrong parameters count");
+ }
+ }
+
+ @Override
+ public Optional remove(byte[] key1, byte[] key2, LLDictionaryResultType resultType) throws IOException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ if (key2.length != key2Size) {
+ throw new IndexOutOfBoundsException(key2.length, key2Size, key2Size);
+ }
+ try {
+ byte[] response = null;
+ var combinedKey = getCombinedKey(key1, key2);
+ switch (resultType) {
+ case VALUE_CHANGED:
+ response = LLUtils.booleanToResponse(this.contains(null, key1, key2));
+ break;
+ case PREVIOUS_VALUE:
+ var data = new Holder();
+ if (db.keyMayExist(cfh, combinedKey, data)) {
+ if (data.getValue() != null) {
+ response = data.getValue();
+ } else {
+ response = db.get(cfh, combinedKey);
+ }
+ } else {
+ response = null;
+ }
+ break;
+ }
+ db.delete(cfh, combinedKey);
+ return Optional.ofNullable(response);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void forEach(@Nullable LLSnapshot snapshot, int parallelism, TriConsumer consumer) {
+ forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
+ }
+
+ //todo: implement parallel execution
+ private void forEach_(TriConsumer consumer, @Nullable Snapshot snapshot, int parallelism) {
+ try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
+ : db.newIterator(cfh))) {
+ iterator.seekToFirst();
+ while (iterator.isValid()) {
+ var combinedKey = iterator.key();
+ var key1 = getKey1(combinedKey);
+ var key2 = getKey2(combinedKey);
+
+ consumer.accept(key1, key2, iterator.value());
+
+ iterator.next();
+ }
+ }
+ }
+
+ @Override
+ public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer> consumer) {
+ forEach_(consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
+ }
+
+ //todo: implement parallel execution
+ private void forEach_(BiConsumer> consumer, @Nullable Snapshot snapshot, int parallelism) {
+ try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
+ : db.newIterator(cfh))) {
+ iterator.seekToFirst();
+ byte[] currentKey1 = null;
+ // only append or iterate on this object! byte[].equals() and hash is not trustworthy!
+ List key2Keys = null;
+ // only append or iterate on this object! byte[].equals() and hash is not trustworthy!
+ List key2Values = null;
+ while (iterator.isValid()) {
+ var combinedKey = iterator.key();
+ var key1 = getKey1(combinedKey);
+
+ if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) {
+ if (currentKey1 != null && !key2Values.isEmpty()) {
+ consumer.accept(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
+ }
+ currentKey1 = key1;
+ key2Keys = new ArrayList<>();
+ key2Values = new ArrayList<>();
+ }
+
+ key2Keys.add(getKey2(combinedKey));
+ key2Values.add(iterator.value());
+
+ iterator.next();
+ }
+ if (currentKey1 != null && !key2Values.isEmpty()) {
+ consumer.accept(currentKey1, UnmodifiableIterableMap.of(key2Keys.toArray(byte[][]::new), key2Values.toArray(byte[][]::new)));
+ }
+ }
+ }
+
+ @Override
+ public void forEach(@Nullable LLSnapshot snapshot, int parallelism, byte[] key, BiConsumer consumer) {
+ forEach_(key, consumer, snapshot == null ? null : snapshotResolver.apply(snapshot), parallelism);
+ }
+
+ //todo: implement parallel execution
+ private void forEach_(byte[] key1, BiConsumer consumer, @Nullable Snapshot snapshot, int parallelism) {
+ try (RocksIterator iterator = (snapshot != null ? db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot))
+ : db.newIterator(cfh))) {
+ iterator.seek(getStartSeekKey(key1));
+ while (iterator.isValid()) {
+ byte[] combinedKey = iterator.key();
+
+ if (!isSubKey(key1, combinedKey)) {
+ // The key is outside of key1: exit from the iteration
+ break;
+ }
+
+ byte[] key2 = getKey2(combinedKey);
+ byte[] value2 = iterator.value();
+ consumer.accept(key2, value2);
+
+ iterator.next();
+ }
+ }
+ }
+
+ //todo: implement parallel execution
+ //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
+ @Override
+ public void replaceAll(int parallelism, boolean replaceKeys, TriFunction> consumer) throws IOException {
+ var snapshot = db.getSnapshot();
+ try {
+ try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot));
+ CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) {
+
+ iter.seekToFirst();
+
+ while (iter.isValid()) {
+
+ writeBatch.delete(cfh, iter.key());
+
+ iter.next();
+ }
+
+ iter.seekToFirst();
+
+ while (iter.isValid()) {
+ var combinedKey = iter.key();
+ var key1 = getKey1(combinedKey);
+ var key2 = getKey2(combinedKey);
+
+ var result = consumer.apply(key1, key2, iter.value());
+ if (result.getLeft().length != key1Size) {
+ throw new IndexOutOfBoundsException(result.getLeft().length, key1Size, key1Size);
+ }
+ if (result.getMiddle().length != key2Size) {
+ throw new IndexOutOfBoundsException(result.getMiddle().length, key2Size, key2Size);
+ }
+
+ writeBatch.put(cfh, getCombinedKey(result.getLeft(), result.getMiddle()), result.getRight());
+
+ iter.next();
+ }
+
+ writeBatch.writeToDbAndClose();
+ }
+ } catch (RocksDBException ex) {
+ throw new IOException(ex);
+ } finally {
+ db.releaseSnapshot(snapshot);
+ snapshot.close();
+ }
+ }
+
+ //todo: implement parallel execution
+ //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
+ @Override
+ public void replaceAll(int parallelism, boolean replaceKeys, BiFunction, Entry>> consumer)
+ throws IOException {
+ try {
+ var snapshot = db.getSnapshot();
+ try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot));
+ CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) {
+
+ iter.seekToFirst();
+
+ while (iter.isValid()) {
+
+ writeBatch.delete(cfh, iter.key());
+
+ iter.next();
+ }
+
+ iter.seekToFirst();
+
+ byte[] currentKey1 = null;
+ // only append or iterate on this object! byte[].equals() and hash is not trustworthy!
+ ObjectArrayList key2Keys = null;
+ // only append or iterate on this object! byte[].equals() and hash is not trustworthy!
+ ObjectArrayList key2Values = null;
+ while (iter.isValid()) {
+ var combinedKey = iter.key();
+ var key1 = getKey1(combinedKey);
+
+ if (currentKey1 == null || !Arrays.equals(currentKey1, key1)) {
+ if (currentKey1 != null && !key2Values.isEmpty()) {
+ replaceAll_(writeBatch,
+ currentKey1,
+ key2Keys.toArray(byte[][]::new),
+ key2Values.toArray(byte[][]::new),
+ consumer
+ );
+ }
+ currentKey1 = key1;
+ key2Keys = new ObjectArrayList<>();
+ key2Values = new ObjectArrayList<>();
+ }
+
+ key2Keys.add(getKey2(combinedKey));
+ key2Values.add(iter.value());
+
+ iter.next();
+ }
+ if (currentKey1 != null && !key2Values.isEmpty()) {
+ replaceAll_(writeBatch,
+ currentKey1,
+ key2Keys.toArray(byte[][]::new),
+ key2Values.toArray(byte[][]::new),
+ consumer
+ );
+ }
+
+ writeBatch.writeToDbAndClose();
+ } finally {
+ db.releaseSnapshot(snapshot);
+ snapshot.close();
+ }
+ } catch (RocksDBException exception) {
+ throw new IOException(exception);
+ }
+ }
+
+ private void replaceAll_(WriteBatchInterface writeBatch,
+ byte[] key1,
+ byte[][] key2Keys,
+ byte[][] key2Values,
+ BiFunction, Entry>> consumer)
+ throws RocksDBException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ var previousValues = UnmodifiableMap.of(key2Keys, key2Values);
+ var result = consumer.apply(key1, previousValues);
+
+ var resultKey1 = result.getKey();
+ if (resultKey1.length != key1Size) {
+ throw new IndexOutOfBoundsException(resultKey1.length, key1Size, key1Size);
+ }
+ var resultValues = result.getValue();
+
+ var mapIterator = resultValues.fastIterator();
+ while (mapIterator.hasNext()) {
+ var mapEntry = mapIterator.next();
+ var key2 = mapEntry.getKey();
+ if (key2.data.length != key2Size) {
+ throw new IndexOutOfBoundsException(key2.data.length, key2Size, key2Size);
+ }
+
+ var value2 = mapEntry.getValue();
+ writeBatch.put(cfh, getCombinedKey(key1, key2.data), value2);
+ }
+ }
+
+ //todo: implement parallel execution
+ //todo: implement replaceKeys = false optimization (like in LLLocalDictionary), check if it's feasible
+ @Override
+ public void replaceAll(int parallelism, boolean replaceKeys, byte[] key1, BiFunction> consumer) throws IOException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ try {
+ var snapshot = db.getSnapshot();
+ try (RocksIterator iter = db.newIterator(cfh, new ReadOptions().setSnapshot(snapshot));
+ CappedWriteBatch writeBatch = new CappedWriteBatch(db, LLLocalDictionary.CAPPED_WRITE_BATCH_CAP, LLLocalDictionary.RESERVED_WRITE_BATCH_SIZE, LLLocalDictionary.MAX_WRITE_BATCH_SIZE, LLLocalDictionary.BATCH_WRITE_OPTIONS)) {
+
+ iter.seek(getStartSeekKey(key1));
+
+ while (iter.isValid()) {
+ byte[] combinedKey = iter.key();
+
+ if (!isSubKey(key1, combinedKey)) {
+ // The key is outside of key1: exit from the iteration
+ break;
+ }
+
+ writeBatch.delete(cfh, combinedKey);
+
+ iter.next();
+ }
+
+ iter.seek(getStartSeekKey(key1));
+
+ while (iter.isValid()) {
+ byte[] combinedKey = iter.key();
+
+ if (!isSubKey(key1, combinedKey)) {
+ // The key is outside of key1: exit from the iteration
+ break;
+ }
+
+ byte[] key2 = getKey2(combinedKey);
+ byte[] value2 = iter.value();
+
+ var result = consumer.apply(key2, value2);
+ if (result.getKey().length != key2Size) {
+ throw new IndexOutOfBoundsException(result.getKey().length, key2Size, key2Size);
+ }
+
+ writeBatch.put(cfh, result.getKey(), result.getValue());
+
+ iter.next();
+ }
+
+ writeBatch.writeToDbAndClose();
+ } finally {
+ db.releaseSnapshot(snapshot);
+ snapshot.close();
+ }
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ // This method is exactly the same of LLLocalDictionary. Remember to keep the code equal
+ @Override
+ public void clear() throws IOException {
+ try {
+ List ranges = new ArrayList<>();
+ byte[] firstKey = null;
+ byte[] lastKey = null;
+ boolean empty = false;
+ while (!empty) {
+ // retrieve the range extremities
+ try (RocksIterator iter = db.newIterator(cfh)) {
+ iter.seekToFirst();
+ if (iter.isValid()) {
+ firstKey = iter.key();
+ iter.seekToLast();
+ lastKey = iter.key();
+ ranges.add(firstKey);
+ ranges.add(lastKey);
+ } else {
+ empty = true;
+ }
+ }
+
+ if (!empty) {
+ if (Arrays.equals(firstKey, lastKey)) {
+ // Delete single key
+ db.delete(cfh, lastKey);
+ } else {
+ // Delete all
+ db.deleteRange(cfh, firstKey, lastKey);
+ // Delete the end because it's not included in the deleteRange domain
+ db.delete(cfh, lastKey);
+ }
+ }
+ }
+
+ // Delete files related
+ db.deleteFilesInRanges(cfh, ranges, true);
+
+ // Compact range
+ db.compactRange(cfh);
+
+ db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
+ db.flushWal(true);
+
+ var finalSize = exactSize(null);
+ if (finalSize != 0) {
+ throw new IllegalStateException("The dictionary is not empty after calling clear()");
+ }
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Optional> clear(byte[] key1, LLDictionaryResultType resultType)
+ throws IOException {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ try {
+ Optional> result;
+ switch (resultType) {
+ case PREVIOUS_VALUE:
+ List keys = new ArrayList<>();
+ List values = new ArrayList<>();
+ try (RocksIterator iter = db.newIterator(cfh)) {
+ iter.seek(getStartSeekKey(key1));
+ while (iter.isValid()) {
+ var combinedKey = iter.key();
+
+ if (!isSubKey(key1, combinedKey)) {
+ break;
+ }
+
+ keys.add(getKey2(combinedKey));
+ values.add(iter.value());
+ }
+ }
+ result = Optional.of(UnmodifiableIterableMap.of(keys.toArray(byte[][]::new), values.toArray(byte[][]::new)));
+ break;
+ case VALUE_CHANGED:
+ if (isEmpty(null, key1)) {
+ result = Optional.empty();
+ } else {
+ result = Optional.of(UnmodifiableIterableMap.of(NO_DATA_MAP, NO_DATA_MAP));
+ }
+ break;
+ case VOID:
+ default:
+ result = Optional.empty();
+ break;
+ }
+ db.deleteRange(cfh, getStartSeekKey(key1), getEndSeekKey(key1));
+ return result;
+ } catch (RocksDBException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public long size(@Nullable LLSnapshot snapshot, boolean fast) {
+ return fast ? fastSize(snapshot) : exactSize(snapshot);
+ }
+
+ public long fastSize(@Nullable LLSnapshot snapshot) {
+ try {
+ if (snapshot != null) {
+ return this.exactSize(snapshot);
+ }
+ return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
+ } catch (RocksDBException e) {
+ e.printStackTrace();
+ return 0;
+ }
+ }
+
+ public long exactSize(@Nullable LLSnapshot snapshot) {
+ long count = 0;
+ byte[] currentKey1 = null;
+ try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ byte[] combinedKey = iter.key();
+
+ if (!isSubKey(currentKey1, combinedKey)) {
+ count++;
+ currentKey1 = getKey1(combinedKey);
+ }
+ iter.next();
+ }
+ return count;
+ }
+ }
+
+ @Override
+ public long exactSize(@Nullable LLSnapshot snapshot, byte[] key1) {
+ if (key1.length != key1Size) {
+ throw new IndexOutOfBoundsException(key1.length, key1Size, key1Size);
+ }
+ long count = 0;
+ try (RocksIterator iterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
+ iterator.seek(getStartSeekKey(key1));
+ while (iterator.isValid()) {
+ byte[] combinedKey = iterator.key();
+
+ if (!isSubKey(key1, combinedKey)) {
+ // The key is outside of key1: exit from the iteration
+ break;
+ }
+
+ count++;
+ iterator.next();
+ }
+ }
+ return count;
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
new file mode 100644
index 0000000..5d4fe90
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java
@@ -0,0 +1,373 @@
+package it.cavallium.dbengine.database.disk;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Holder;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.warp.commonutils.concurrency.atomicity.NotAtomic;
+import it.cavallium.dbengine.database.LLDictionary;
+import it.cavallium.dbengine.database.LLDictionaryResultType;
+import it.cavallium.dbengine.database.LLSnapshot;
+import it.cavallium.dbengine.database.LLUtils;
+
+@NotAtomic
+public class LLLocalDictionary implements LLDictionary {
+
+ private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = true;
+ static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB
+ static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
+ static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
+ static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
+
+ private static final byte[] NO_DATA = new byte[0];
+ private static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
+ private static final List EMPTY_UNMODIFIABLE_LIST = List.of();
+ private final RocksDB db;
+ private final ColumnFamilyHandle cfh;
+ private final String databaseName;
+ private final Function snapshotResolver;
+
+ public LLLocalDictionary(@NotNull RocksDB db,
+ @NotNull ColumnFamilyHandle columnFamilyHandle,
+ String databaseName,
+ Function snapshotResolver) {
+ Objects.requireNonNull(db);
+ this.db = db;
+ Objects.requireNonNull(columnFamilyHandle);
+ this.cfh = columnFamilyHandle;
+ this.databaseName = databaseName;
+ this.snapshotResolver = snapshotResolver;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
+ if (snapshot != null) {
+ return getReadOptions(snapshotResolver.apply(snapshot));
+ } else {
+ return EMPTY_READ_OPTIONS;
+ }
+ }
+
+ private ReadOptions getReadOptions(Snapshot snapshot) {
+ if (snapshot != null) {
+ return new ReadOptions().setSnapshot(snapshot);
+ } else {
+ return EMPTY_READ_OPTIONS;
+ }
+ }
+
+ @Override
+ public Optional get(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
+ try {
+ Holder data = new Holder<>();
+ if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
+ if (data.getValue() != null) {
+ return Optional.of(data.getValue());
+ } else {
+ byte[] value = db.get(cfh, resolveSnapshot(snapshot), key);
+ return Optional.ofNullable(value);
+ }
+ } else {
+ return Optional.empty();
+ }
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean contains(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
+ return contains_(snapshot, key);
+ }
+
+ private boolean contains_(@Nullable LLSnapshot snapshot, byte[] key) throws IOException {
+ try {
+ int size = RocksDB.NOT_FOUND;
+ Holder data = new Holder<>();
+ if (db.keyMayExist(cfh, resolveSnapshot(snapshot), key, data)) {
+ if (data.getValue() != null) {
+ size = data.getValue().length;
+ } else {
+ size = db.get(cfh, resolveSnapshot(snapshot), key, NO_DATA);
+ }
+ }
+ return size != RocksDB.NOT_FOUND;
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Optional put(byte[] key, byte[] value, LLDictionaryResultType resultType) throws IOException {
+ try {
+ byte[] response = null;
+ switch (resultType) {
+ case VALUE_CHANGED:
+ response = LLUtils.booleanToResponse(!contains_(null, key));
+ break;
+ case PREVIOUS_VALUE:
+ var data = new Holder();
+ if (db.keyMayExist(cfh, key, data)) {
+ if (data.getValue() != null) {
+ response = data.getValue();
+ } else {
+ response = db.get(cfh, key);
+ }
+ } else {
+ response = null;
+ }
+ break;
+ }
+ db.put(cfh, key, value);
+ return Optional.ofNullable(response);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void putMulti(byte[][] key, byte[][] value, LLDictionaryResultType resultType, Consumer responsesConsumer)
+ throws IOException {
+ if (key.length == value.length) {
+ List responses;
+ try (WriteBatch writeBatch = new WriteBatch(RESERVED_WRITE_BATCH_SIZE)) {
+
+ if (resultType == LLDictionaryResultType.VOID) {
+ responses = EMPTY_UNMODIFIABLE_LIST;
+ } else {
+ responses = db.multiGetAsList(newCfhList(cfh, key.length), Arrays.asList(key));
+ }
+
+ for (int i = 0; i < key.length; i++) {
+ writeBatch.put(cfh, key[i], value[i]);
+ }
+
+ db.write(BATCH_WRITE_OPTIONS, writeBatch);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+
+ for (byte[] response : responses) {
+ responsesConsumer.accept(response);
+ }
+ } else {
+ throw new IOException("Wrong parameters count");
+ }
+ }
+
+ private static List newCfhList(ColumnFamilyHandle cfh, int size) {
+ var list = new ArrayList(size);
+ for (int i = 0; i < size; i++) {
+ list.add(cfh);
+ }
+ return list;
+ }
+
+ @Override
+ public Optional remove(byte[] key, LLDictionaryResultType resultType) throws IOException {
+ try {
+ byte[] response = null;
+ switch (resultType) {
+ case VALUE_CHANGED:
+ response = LLUtils.booleanToResponse(contains_(null, key));
+ break;
+ case PREVIOUS_VALUE:
+ var data = new Holder();
+ if (db.keyMayExist(cfh, key, data)) {
+ if (data.getValue() != null) {
+ response = data.getValue();
+ } else {
+ response = db.get(cfh, key);
+ }
+ } else {
+ response = null;
+ }
+ break;
+ }
+ db.delete(cfh, key);
+ return Optional.ofNullable(response);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ //todo: implement parallel forEach
+ @Override
+ public void forEach(@Nullable LLSnapshot snapshot, int parallelism, BiConsumer consumer) {
+ try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ consumer.accept(iter.key(), iter.value());
+ iter.next();
+ }
+ }
+ }
+
+ //todo: implement parallel replace
+ @Override
+ public void replaceAll(int parallelism, boolean replaceKeys, BiFunction> consumer) throws IOException {
+ try {
+ try (var snapshot = replaceKeys ? db.getSnapshot() : null) {
+ try (RocksIterator iter = db.newIterator(cfh, getReadOptions(snapshot));
+ CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) {
+
+ iter.seekToFirst();
+
+ if (replaceKeys) {
+ while (iter.isValid()) {
+ writeBatch.delete(cfh, iter.key());
+
+ iter.next();
+ }
+ }
+
+ iter.seekToFirst();
+
+ while (iter.isValid()) {
+
+ var result = consumer.apply(iter.key(), iter.value());
+ boolean keyDiffers = !Arrays.equals(iter.key(), result.getKey());
+ if (!replaceKeys && keyDiffers) {
+ throw new IOException("Tried to replace a key");
+ }
+
+ // put if changed or if keys can be swapped/replaced
+ if (replaceKeys || !Arrays.equals(iter.value(), result.getValue())) {
+ writeBatch.put(cfh, result.getKey(), result.getValue());
+ }
+
+ iter.next();
+ }
+
+ writeBatch.writeToDbAndClose();
+ } finally {
+ db.releaseSnapshot(snapshot);
+ }
+ }
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ // This method is exactly the same of LLLocalDictionary. Remember to keep the code equal
+ @Override
+ public void clear() throws IOException {
+ try (RocksIterator iter = db.newIterator(cfh);
+ CappedWriteBatch writeBatch = new CappedWriteBatch(db, CAPPED_WRITE_BATCH_CAP, RESERVED_WRITE_BATCH_SIZE, MAX_WRITE_BATCH_SIZE, BATCH_WRITE_OPTIONS)) {
+
+ iter.seekToFirst();
+
+ while (iter.isValid()) {
+ writeBatch.delete(cfh, iter.key());
+
+ iter.next();
+ }
+
+ writeBatch.writeToDbAndClose();
+
+ // Compact range
+ db.compactRange(cfh);
+
+ db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
+ db.flushWal(true);
+
+ var finalSize = exactSize(null);
+ if (finalSize != 0) {
+ throw new IllegalStateException("The dictionary is not empty after calling clear()");
+ }
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public long size(@Nullable LLSnapshot snapshot, boolean fast) throws IOException {
+ return fast ? fastSize(snapshot) : exactSize(snapshot);
+ }
+
+ public long fastSize(@Nullable LLSnapshot snapshot) {
+ var rocksdbSnapshot = resolveSnapshot(snapshot);
+ if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
+ try {
+ return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
+ } catch (RocksDBException e) {
+ e.printStackTrace();
+ return 0;
+ }
+ } else {
+ long count = 0;
+ try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
+ iter.seekToFirst();
+ // If it's a fast size of a snapshot, count only up to 1000 elements
+ while (iter.isValid() && count < 1000) {
+ count++;
+ iter.next();
+ }
+ return count;
+ }
+ }
+ }
+
+ public long exactSize(@Nullable LLSnapshot snapshot) {
+ long count = 0;
+ try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ count++;
+ iter.next();
+ }
+ return count;
+ }
+ }
+
+ @Override
+ public boolean isEmpty(@Nullable LLSnapshot snapshot) {
+ try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
+ iter.seekToFirst();
+ if (iter.isValid()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Optional> removeOne() throws IOException {
+ try (RocksIterator iter = db.newIterator(cfh)) {
+ iter.seekToFirst();
+ if (iter.isValid()) {
+ byte[] key = iter.key();
+ byte[] value = iter.value();
+ db.delete(cfh, key);
+ return Optional.of(Map.entry(key, value));
+ }
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ return Optional.empty();
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
new file mode 100644
index 0000000..2b16135
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java
@@ -0,0 +1,431 @@
+package it.cavallium.dbengine.database.disk;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.DBOptions;
+import org.rocksdb.DbPath;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.Snapshot;
+import org.rocksdb.WALRecoveryMode;
+import it.cavallium.dbengine.database.Column;
+import it.cavallium.dbengine.database.LLDeepDictionary;
+import it.cavallium.dbengine.database.LLDictionary;
+import it.cavallium.dbengine.database.LLKeyValueDatabase;
+import it.cavallium.dbengine.database.LLSingleton;
+import it.cavallium.dbengine.database.LLSnapshot;
+
+public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor(
+ RocksDB.DEFAULT_COLUMN_FAMILY);
+
+ private final Path dbPath;
+ private final String name;
+ private RocksDB db;
+ private final Map handles;
+ private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>();
+ private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
+
+ public LLLocalKeyValueDatabase(String name, Path path, List columns, List handles,
+ boolean crashIfWalError, boolean lowMemory) throws IOException {
+ Options options = openRocksDb(path, crashIfWalError, lowMemory);
+ try {
+ List descriptors = new LinkedList<>();
+ for (Column column : columns) {
+ descriptors
+ .add(new ColumnFamilyDescriptor(column.getName().getBytes(StandardCharsets.US_ASCII)));
+ }
+
+ // Get databases directory path
+ Path databasesDirPath = path.toAbsolutePath().getParent();
+
+ String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName();
+ Path dbPath = Paths.get(dbPathString);
+ this.dbPath = dbPath;
+ this.name = name;
+
+ createIfNotExists(descriptors, options, dbPath, dbPathString);
+ // Create all column families that don't exist
+ createAllColumns(descriptors, options, dbPathString);
+
+ // a factory method that returns a RocksDB instance
+ this.db = RocksDB.open(new DBOptions(options), dbPathString, descriptors, handles);
+ this.handles = new HashMap<>();
+ for (int i = 0; i < columns.size(); i++) {
+ this.handles.put(columns.get(i), handles.get(i));
+ }
+
+ /*
+ System.out.println("----Data----");
+ this.handles.forEach((Column column, ColumnFamilyHandle hnd) -> {
+ System.out.println("Column: " + column.getName());
+ if (!column.getName().contains("hash")) {
+ var val = new ArrayList();
+ var iter = db.newIterator(hnd);
+ iter.seekToFirst();
+ while (iter.isValid()) {
+ val.add(Column.toString(iter.key()));
+ System.out.println(" " + Column.toString(iter.key()));
+ iter.next();
+ }
+ }
+ });
+ */
+
+ /*
+ System.out.println("----Columns----");
+ this.handles.forEach((Column column, ColumnFamilyHandle hnd) -> {
+ System.out.println("Column: " + column.getName());
+ });
+ */
+
+ flushDb(db, handles);
+ } catch (RocksDBException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return name;
+ }
+
+ private void flushAndCloseDb(RocksDB db, List handles)
+ throws RocksDBException {
+ flushDb(db, handles);
+
+ for (ColumnFamilyHandle handle : handles) {
+ handle.close();
+ }
+
+ db.closeE();
+ }
+
+ private void flushDb(RocksDB db, List handles) throws RocksDBException {
+ // force flush the database
+ for (int i = 0; i < 2; i++) {
+ db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), handles);
+ db.flushWal(true);
+ db.syncWal();
+ }
+ // end force flush
+ }
+
+ private static Options openRocksDb(Path path, boolean crashIfWalError, boolean lowMemory)
+ throws IOException {
+ // Get databases directory path
+ Path databasesDirPath = path.toAbsolutePath().getParent();
+ // Create base directories
+ if (Files.notExists(databasesDirPath)) {
+ Files.createDirectories(databasesDirPath);
+ }
+
+ // the Options class contains a set of configurable DB options
+ // that determines the behaviour of the database.
+ var options = new Options();
+ options.setCreateIfMissing(true);
+ options.setCompactionStyle(CompactionStyle.LEVEL);
+ options.setLevelCompactionDynamicLevelBytes(true);
+ options.setTargetFileSizeBase(64 * 1024 * 1024); // 64MiB sst file
+ options.setMaxBytesForLevelBase(4 * 256 * 1024 * 1024); // 4 times the sst file
+ options.setCompressionType(CompressionType.SNAPPY_COMPRESSION);
+ options.setManualWalFlush(false);
+ options.setMinWriteBufferNumberToMerge(3);
+ options.setMaxWriteBufferNumber(4);
+ options.setWalTtlSeconds(30); // flush wal after 30 seconds
+ options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
+ options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup
+ options.setWalRecoveryMode(crashIfWalError ? WALRecoveryMode.AbsoluteConsistency
+ : WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted. Default: TolerateCorruptedTailRecords
+ options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
+ options.setPreserveDeletes(false);
+ options.setKeepLogFileNum(10);
+ // Direct I/O parameters. Removed because they use too much disk.
+ //options.setUseDirectReads(true);
+ //options.setUseDirectIoForFlushAndCompaction(true);
+ //options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB
+ //options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default
+ if (lowMemory) {
+ // LOW MEMORY
+ options
+ .setBytesPerSync(1024 * 1024)
+ .setWalBytesPerSync(1024 * 1024)
+ .setIncreaseParallelism(1)
+ .setMaxOpenFiles(2)
+ .optimizeLevelStyleCompaction(1024 * 1024) // 1MiB of ram will be used for level style compaction
+ .setWriteBufferSize(1024 * 1024) // 1MB
+ .setWalSizeLimitMB(16) // 16MB
+ .setMaxTotalWalSize(1024L * 1024L * 1024L) // 1GiB max wal directory size
+ .setDbPaths(List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
+ 400L * 1024L * 1024L * 1024L), // 400GiB
+ new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
+ 600L * 1024L * 1024L * 1024L))) // 600GiB
+ ;
+ } else {
+ // HIGH MEMORY
+ options
+ .setAllowConcurrentMemtableWrite(true)
+ .setEnableWriteThreadAdaptiveYield(true)
+ .setIncreaseParallelism(Runtime.getRuntime().availableProcessors())
+ .setBytesPerSync(10 * 1024 * 1024)
+ .setWalBytesPerSync(10 * 1024 * 1024)
+ .optimizeLevelStyleCompaction(
+ 128 * 1024 * 1024) // 128MiB of ram will be used for level style compaction
+ .setWriteBufferSize(128 * 1024 * 1024) // 128MB
+ .setWalSizeLimitMB(1024) // 1024MB
+ .setMaxTotalWalSize(8L * 1024L * 1024L * 1024L) // 8GiB max wal directory size
+ .setDbPaths(List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
+ 400L * 1024L * 1024L * 1024L), // 400GiB
+ new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
+ 600L * 1024L * 1024L * 1024L))) // 600GiB
+ ;
+ }
+
+ final org.rocksdb.BloomFilter bloomFilter = new BloomFilter(10, false);
+ final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
+ tableOptions.setFilterPolicy(bloomFilter);
+ options.setTableFormatConfig(tableOptions);
+
+ return options;
+ }
+
+ private void createAllColumns(List totalDescriptors, Options options,
+ String dbPathString) throws RocksDBException {
+ List columnFamiliesToCreate = new LinkedList<>();
+
+ for (ColumnFamilyDescriptor descriptor : totalDescriptors) {
+ columnFamiliesToCreate.add(descriptor.getName());
+ }
+
+ List existingColumnFamilies = RocksDB.listColumnFamilies(options, dbPathString);
+
+ columnFamiliesToCreate.removeIf((columnFamilyName) -> {
+ for (byte[] cfn : existingColumnFamilies) {
+ if (Arrays.equals(cfn, columnFamilyName)) {
+ return true;
+ }
+ }
+ return false;
+ });
+
+ List descriptors = new LinkedList<>();
+ for (byte[] existingColumnFamily : existingColumnFamilies) {
+ descriptors.add(new ColumnFamilyDescriptor(existingColumnFamily));
+ }
+
+ var handles = new LinkedList();
+
+ /**
+ * SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
+ */
+ //var dbOptionsFastLoadSlowEdit = new DBOptions(options.setSkipStatsUpdateOnDbOpen(true));
+
+ this.db = RocksDB.open(new DBOptions(options), dbPathString, descriptors, handles);
+
+ for (byte[] name : columnFamiliesToCreate) {
+ db.createColumnFamily(new ColumnFamilyDescriptor(name)).close();
+ }
+
+ flushAndCloseDb(db, handles);
+ }
+
+ private void createIfNotExists(List descriptors, Options options,
+ Path dbPath, String dbPathString) throws RocksDBException {
+ if (Files.notExists(dbPath)) {
+ // Check if handles are all different
+ var descriptorsSet = new HashSet<>(descriptors);
+ if (descriptorsSet.size() != descriptors.size()) {
+ throw new IllegalArgumentException("Descriptors must be unique!");
+ }
+
+ List descriptorsToCreate = new LinkedList<>(descriptors);
+ descriptorsToCreate
+ .removeIf((cf) -> Arrays.equals(cf.getName(), DEFAULT_COLUMN_FAMILY.getName()));
+
+ /**
+ * SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
+ */
+ //var dbOptionsFastLoadSlowEdit = options.setSkipStatsUpdateOnDbOpen(true);
+
+ LinkedList handles = new LinkedList<>();
+
+ this.db = RocksDB.open(options, dbPathString);
+ for (ColumnFamilyDescriptor columnFamilyDescriptor : descriptorsToCreate) {
+ handles.add(db.createColumnFamily(columnFamilyDescriptor));
+ }
+
+ flushAndCloseDb(db, handles);
+ }
+ }
+
+ @Override
+ public LLSingleton getSingleton(byte[] singletonListColumnName, byte[] name, byte[] defaultValue)
+ throws IOException {
+ try {
+ return new LLLocalSingleton(db,
+ handles.get(Column.special(Column.toString(singletonListColumnName))),
+ (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
+ LLLocalKeyValueDatabase.this.name,
+ name,
+ defaultValue);
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public LLDictionary getDictionary(byte[] columnName) {
+ return new LLLocalDictionary(db,
+ handles.get(Column.special(Column.toString(columnName))),
+ name,
+ (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber())
+ );
+ }
+
+ @Override
+ public LLDeepDictionary getDeepDictionary(byte[] columnName, int keySize, int key2Size) {
+ return new LLLocalDeepDictionary(db,
+ handles.get(Column.special(Column.toString(columnName))),
+ name,
+ (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
+ keySize,
+ key2Size
+ );
+ }
+
+ @Override
+ public long getProperty(String propertyName) throws IOException {
+ try {
+ return db.getAggregatedLongProperty(propertyName);
+ } catch (RocksDBException exception) {
+ throw new IOException(exception);
+ }
+ }
+
+ @Override
+ public LLSnapshot takeSnapshot() {
+ var snapshot = db.getSnapshot();
+ long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
+ this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
+ return new LLSnapshot(currentSnapshotSequenceNumber);
+ }
+
+ @Override
+ public void releaseSnapshot(LLSnapshot snapshot) throws IOException {
+ Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
+ if (dbSnapshot == null) {
+ throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
+ }
+ db.releaseSnapshot(dbSnapshot);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ flushAndCloseDb(db, new ArrayList<>(handles.values()));
+ deleteUnusedOldLogFiles();
+ } catch (RocksDBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Call this method ONLY AFTER flushing completely a db and closing it!
+ */
+ private void deleteUnusedOldLogFiles() {
+ Path basePath = dbPath;
+ try {
+ Files
+ .walk(basePath, 1)
+ .filter(p -> !p.equals(basePath))
+ .filter(p -> {
+ var fileName = p.getFileName().toString();
+ if (fileName.startsWith("LOG.old.")) {
+ var parts = fileName.split("\\.");
+ if (parts.length == 3) {
+ try {
+ long nameSuffix = Long.parseUnsignedLong(parts[2]);
+ return true;
+ } catch (NumberFormatException ex) {
+ return false;
+ }
+ }
+ }
+ if (fileName.endsWith(".log")) {
+ var parts = fileName.split("\\.");
+ if (parts.length == 2) {
+ try {
+ int name = Integer.parseUnsignedInt(parts[0]);
+ return true;
+ } catch (NumberFormatException ex) {
+ return false;
+ }
+ }
+ }
+ return false;
+ })
+ .filter(p -> {
+ try {
+ BasicFileAttributes attrs = Files.readAttributes(p, BasicFileAttributes.class);
+ if (attrs.isRegularFile() && !attrs.isSymbolicLink() && !attrs.isDirectory()) {
+ long ctime = attrs.creationTime().toMillis();
+ long atime = attrs.lastAccessTime().toMillis();
+ long mtime = attrs.lastModifiedTime().toMillis();
+ long lastTime = Math.max(Math.max(ctime, atime), mtime);
+ long safeTime;
+ if (p.getFileName().toString().startsWith("LOG.old.")) {
+ safeTime = System.currentTimeMillis() - Duration.ofHours(24).toMillis();
+ } else {
+ safeTime = System.currentTimeMillis() - Duration.ofHours(12).toMillis();
+ }
+ if (lastTime < safeTime) {
+ return true;
+ }
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return false;
+ }
+ return false;
+ })
+ .forEach(path -> {
+ try {
+ Files.deleteIfExists(path);
+ System.out.println("Deleted log file \"" + path + "\"");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
new file mode 100644
index 0000000..8144fbf
--- /dev/null
+++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneIndex.java
@@ -0,0 +1,431 @@
+package it.cavallium.dbengine.database.disk;
+
+import it.cavallium.dbengine.database.luceneutil.AdaptiveStreamSearcher;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexCommit;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.SnapshotDeletionPolicy;
+import org.apache.lucene.queries.mlt.MoreLikeThis;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.ScoreDoc;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.search.TopDocs;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.jetbrains.annotations.Nullable;
+import org.warp.commonutils.functional.IOFunction;
+import org.warp.commonutils.type.ShortNamedThreadFactory;
+import it.cavallium.dbengine.database.LLDocument;
+import it.cavallium.dbengine.database.LLKeyScore;
+import it.cavallium.dbengine.database.LLLuceneIndex;
+import it.cavallium.dbengine.database.LLSnapshot;
+import it.cavallium.dbengine.database.LLSort;
+import it.cavallium.dbengine.database.LLTerm;
+import it.cavallium.dbengine.database.LLTopKeys;
+import it.cavallium.dbengine.database.LLUtils;
+import it.cavallium.dbengine.database.LuceneUtils;
+import it.cavallium.dbengine.database.analyzer.TextFieldsAnalyzer;
+import it.cavallium.dbengine.database.luceneutil.LuceneStreamSearcher;
+import it.cavallium.luceneserializer.luceneserializer.ParseException;
+import it.cavallium.luceneserializer.luceneserializer.QueryParser;
+import reactor.core.publisher.EmitterProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
+
+public class LLLocalLuceneIndex implements LLLuceneIndex {
+
+ private static final LuceneStreamSearcher streamSearcher = new AdaptiveStreamSearcher();
+ private final Duration queryRefreshDebounceTime;
+ private final Duration commitDebounceTime;
+ private final String luceneIndexName;
+ private final SnapshotDeletionPolicy snapshotter;
+ private final IndexWriter indexWriter;
+ private final SearcherManager searcherManager;
+ private final Directory directory;
+ private final AtomicLong lastSearcherRefresh = new AtomicLong(0);
+ /**
+ * Last snapshot sequence number. 0 is not used
+ */
+ private final AtomicLong lastSnapshotSeqNo = new AtomicLong(0);
+ /**
+ * Snapshot seq no to index commit point
+ */
+ private final ConcurrentHashMap snapshots = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService scheduler;
+ private final boolean lowMemory;
+
+ public LLLocalLuceneIndex(Path luceneBasePath,
+ String name,
+ TextFieldsAnalyzer analyzer,
+ Duration queryRefreshDebounceTime,
+ Duration commitDebounceTime,
+ boolean lowMemory) throws IOException {
+ if (name.length() == 0) {
+ throw new IOException("Empty lucene database name");
+ }
+ Path directoryPath = luceneBasePath.resolve(name + ".lucene.db");
+ this.directory = FSDirectory.open(directoryPath);
+ this.luceneIndexName = name;
+ this.snapshotter = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
+ this.lowMemory = lowMemory;
+ IndexWriterConfig indexWriterConfig = new IndexWriterConfig(LuceneUtils.getAnalyzer(analyzer));
+ indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE_OR_APPEND);
+ indexWriterConfig.setIndexDeletionPolicy(snapshotter);
+ indexWriterConfig.setCommitOnClose(true);
+ if (lowMemory) {
+ indexWriterConfig.setRAMBufferSizeMB(32);
+ indexWriterConfig.setRAMPerThreadHardLimitMB(32);
+ } else {
+ indexWriterConfig.setRAMBufferSizeMB(128);
+ indexWriterConfig.setRAMPerThreadHardLimitMB(512);
+ }
+ this.indexWriter = new IndexWriter(directory, indexWriterConfig);
+ this.searcherManager = new SearcherManager(indexWriter, false, false, null);
+ this.queryRefreshDebounceTime = queryRefreshDebounceTime;
+ this.commitDebounceTime = commitDebounceTime;
+ this.lastSearcherRefresh.set(System.currentTimeMillis());
+ this.scheduler = Executors.newSingleThreadScheduledExecutor(new ShortNamedThreadFactory("Lucene"));
+ scheduler.scheduleAtFixedRate(this::scheduledCommit,
+ commitDebounceTime.toMillis(),
+ commitDebounceTime.toMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ scheduler.scheduleAtFixedRate(this::scheduledQueryRefresh,
+ queryRefreshDebounceTime.toMillis(),
+ queryRefreshDebounceTime.toMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ @Override
+ public String getLuceneIndexName() {
+ return luceneIndexName;
+ }
+
+ @Override
+ public LLSnapshot takeSnapshot() throws IOException {
+
+ long snapshotSeqNo = lastSnapshotSeqNo.incrementAndGet();
+
+ IndexCommit snapshot = takeLuceneSnapshot();
+ this.snapshots.put(snapshotSeqNo, new LuceneIndexSnapshot(snapshot));
+ return new LLSnapshot(snapshotSeqNo);
+ }
+
+ /**
+ * Use internally. This method commits before taking the snapshot if there are no commits in a new database,
+ * avoiding the exception.
+ */
+ private IndexCommit takeLuceneSnapshot() throws IOException {
+ try {
+ return snapshotter.snapshot();
+ } catch (IllegalStateException ex) {
+ if ("No index commit to snapshot".equals(ex.getMessage())) {
+ indexWriter.commit();
+ return snapshotter.snapshot();
+ } else {
+ throw ex;
+ }
+ }
+ }
+
+ @Override
+ public void releaseSnapshot(LLSnapshot snapshot) throws IOException {
+ var indexSnapshot = this.snapshots.remove(snapshot.getSequenceNumber());
+ if (indexSnapshot == null) {
+ throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
+ }
+
+ indexSnapshot.close();
+
+ var luceneIndexSnapshot = indexSnapshot.getSnapshot();
+ snapshotter.release(luceneIndexSnapshot);
+ // Delete unused files after releasing the snapshot
+ indexWriter.deleteUnusedFiles();
+ }
+
+ @Override
+ public void addDocument(LLTerm key, LLDocument doc) throws IOException {
+ indexWriter.addDocument(LLUtils.toDocument(doc));
+ }
+
+ @Override
+ public void addDocuments(Iterable keys, Iterable docs) throws IOException {
+ indexWriter.addDocuments(LLUtils.toDocuments(docs));
+ }
+
+ @Override
+ public void deleteDocument(LLTerm id) throws IOException {
+ indexWriter.deleteDocuments(LLUtils.toTerm(id));
+ }
+
+ @Override
+ public void updateDocument(LLTerm id, LLDocument document) throws IOException {
+ indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(document));
+ }
+
+ @Override
+ public void updateDocuments(Iterable ids, Iterable documents)
+ throws IOException {
+ var idIt = ids.iterator();
+ var docIt = documents.iterator();
+ while (idIt.hasNext()) {
+ var id = idIt.next();
+ var doc = docIt.next();
+
+ indexWriter.updateDocument(LLUtils.toTerm(id), LLUtils.toDocument(doc));
+ }
+ }
+
+ @Override
+ public void deleteAll() throws IOException {
+ indexWriter.deleteAll();
+ indexWriter.commit();
+ indexWriter.forceMergeDeletes(true);
+ indexWriter.flush();
+ indexWriter.commit();
+ }
+
+ @Override
+ public Collection search(@Nullable LLSnapshot snapshot, String queryString, int limit, @Nullable LLSort sort,
+ String keyFieldName)
+ throws IOException {
+ try {
+ var luceneIndexSnapshot = resolveSnapshot(snapshot);
+
+ Query query = QueryParser.parse(queryString);
+ Sort luceneSort = LLUtils.toSort(sort);
+
+ return Collections.singleton(runSearch(luceneIndexSnapshot, (indexSearcher) -> {
+ return blockingSearch(indexSearcher, limit, query, luceneSort, keyFieldName);
+ }));
+ } catch (ParseException e) {
+ throw new IOException("Error during query count!", e);
+ }
+ }
+
+ @Override
+ public Collection moreLikeThis(@Nullable LLSnapshot snapshot, Map> mltDocumentFields, int limit,
+ String keyFieldName)
+ throws IOException {
+ var luceneIndexSnapshot = resolveSnapshot(snapshot);
+
+ if (mltDocumentFields.isEmpty()) {
+ return Collections.singleton(new LLTopKeys(0, new LLKeyScore[0]));
+ }
+
+ return Collections.singleton(runSearch(luceneIndexSnapshot, (indexSearcher) -> {
+
+ var mlt = new MoreLikeThis(indexSearcher.getIndexReader());
+ mlt.setAnalyzer(indexWriter.getAnalyzer());
+ mlt.setFieldNames(mltDocumentFields.keySet().toArray(String[]::new));
+ mlt.setMinTermFreq(1);
+ //mlt.setMinDocFreq(1);
+ mlt.setBoost(true);
+
+ // Get the reference doc and apply it to MoreLikeThis, to generate the query
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Query query = mlt.like((Map) mltDocumentFields);
+
+ // Search
+ return blockingSearch(indexSearcher, limit, query, null, keyFieldName);
+ }));
+ }
+
+ private static LLTopKeys blockingSearch(IndexSearcher indexSearcher,
+ int limit,
+ Query query,
+ Sort luceneSort,
+ String keyFieldName) throws IOException {
+ TopDocs results;
+ List keyScores;
+
+ results = luceneSort != null ? indexSearcher.search(query, limit, luceneSort)
+ : indexSearcher.search(query, limit);
+ var hits = ObjectArrayList.wrap(results.scoreDocs);
+ keyScores = new LinkedList<>();
+ for (ScoreDoc hit : hits) {
+ int docId = hit.doc;
+ float score = hit.score;
+ Document d = indexSearcher.doc(docId, Set.of(keyFieldName));
+ if (d.getFields().isEmpty()) {
+ System.err.println("The document docId:" + docId + ",score:" + score + " is empty.");
+ var realFields = indexSearcher.doc(docId).getFields();
+ if (!realFields.isEmpty()) {
+ System.err.println("Present fields:");
+ for (IndexableField field : realFields) {
+ System.err.println(" - " + field.name());
+ }
+ }
+ } else {
+ var field = d.getField(keyFieldName);
+ if (field == null) {
+ System.err.println("Can't get key of document docId:" + docId + ",score:" + score);
+ } else {
+ keyScores.add(new LLKeyScore(field.stringValue(), score));
+ }
+ }
+ }
+ return new LLTopKeys(results.totalHits.value, keyScores.toArray(new LLKeyScore[0]));
+ }
+
+ @SuppressWarnings("UnnecessaryLocalVariable")
+ @Override
+ public Tuple2, Collection>> searchStream(@Nullable LLSnapshot snapshot, String queryString, int limit,
+ @Nullable LLSort sort, String keyFieldName) {
+ try {
+ Query query = QueryParser.parse(queryString);
+ Sort luceneSort = LLUtils.toSort(sort);
+
+ var acquireSearcherWrappedBlocking = Mono
+ .fromCallable(() -> {
+ if (snapshot == null) {
+ return searcherManager.acquire();
+ } else {
+ return resolveSnapshot(snapshot).getIndexSearcher();
+ }
+ })
+ .subscribeOn(Schedulers.boundedElastic());
+
+ EmitterProcessor countProcessor = EmitterProcessor.create();
+ EmitterProcessor resultsProcessor = EmitterProcessor.create();
+
+ var publisher = acquireSearcherWrappedBlocking.flatMapMany(indexSearcher -> {
+ return Flux.