diff --git a/src/main/java/it/cavallium/rockserver/core/Migrate.java b/src/main/java/it/cavallium/rockserver/core/Migrate.java index 5e8f123..4f8fdff 100644 --- a/src/main/java/it/cavallium/rockserver/core/Migrate.java +++ b/src/main/java/it/cavallium/rockserver/core/Migrate.java @@ -12,10 +12,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.async.TAsyncClientManager; import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TNonblockingSocket; +import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.layered.TFramedTransport; import org.bson.BasicBSONEncoder; import org.bson.BasicBSONObject; @@ -26,15 +34,12 @@ public class Migrate { public static void main(String[] args) throws TException, IOException { // Tunables - String n = "401"; + String n = "402"; var columnName = "peers_dtg-slave-" + n; var columnSchema = new ColumnSchema(List.of(Long.BYTES), List.of(), true); // - var transport = new TFramedTransport(new TSocket("10.0.0.9", 5332)); - transport.open(); - - var jo = Document.parse("{\"data\": " + Files.readString(Path.of("/tmp/export/" + columnName + ".json")) + "}"); + var jo = Document.parse("{\"data\": " + Files.readString(Path.of("/home/cavallium/tmp_export/" + columnName + ".json")) + "}"); List documents = (List) jo.get("data"); jo = null; @@ -61,26 +66,60 @@ public class Migrate { documents = null; System.gc(); System.out.println("parsed documents"); - var protocol = new TBinaryProtocol(transport); - var client = new RocksDB.Client(protocol); - long columnId = client.createColumn(columnName, columnSchema); - var encoder = new BasicBSONEncoder(); - long nn = 0; + var protocol = new TBinaryProtocol.Factory(); + var clients = ThreadLocal.withInitial(() -> { + try { + return new RocksDB.AsyncClient(protocol, new TAsyncClientManager(), new TNonblockingSocket("10.0.0.9", 5332)); + } catch (IOException | TTransportException e) { + throw new RuntimeException(e); + } + }); + var columnIdFuture = new CompletableFuture(); + clients.get().createColumn(columnName, columnSchema, new AsyncMethodCallback<>() { + @Override + public void onComplete(Long response) { + columnIdFuture.complete(response); + } + + @Override + public void onError(Exception exception) { + columnIdFuture.completeExceptionally(exception); + } + }); + long columnId = columnIdFuture.join(); + var encoder = ThreadLocal.withInitial(BasicBSONEncoder::new); + AtomicLong next = new AtomicLong(); long total = documentMap.size(); var initTime = Instant.now(); - for (Entry longDocumentEntry : documentMap) { + documentMap.stream().parallel().forEach(longDocumentEntry -> { ByteBuffer bb = ByteBuffer.allocate(Long.BYTES); bb.asLongBuffer().put(longDocumentEntry.getKey()); - var valueArray = encoder.encode(longDocumentEntry.getValue()); + var valueArray = encoder.get().encode(longDocumentEntry.getValue()); var valueBuf = ByteBuffer.wrap(valueArray); - client.putFast(0, columnId, List.of(bb), valueBuf); + try { + var putFuture = new CompletableFuture(); + clients.get().putFast(0, columnId, List.of(bb), valueBuf, new AsyncMethodCallback<>() { + @Override + public void onComplete(Void response) { + putFuture.complete(null); + } + + @Override + public void onError(Exception exception) { + putFuture.completeExceptionally(exception); + } + }); + putFuture.join(); + } catch (TException e) { + throw new RuntimeException(e); + } + var nn = next.incrementAndGet(); if (nn > 0 && nn % 10_000 == 0) { var endTime = Instant.now(); var dur = Duration.between(initTime, endTime); System.out.printf("Written %d/%d elements... %.2f, speed: %.2fHz%n", nn, total, ((nn * 100d) / total), nn / (dur.toMillis() / 1000d)); } - nn++; - } + }); var endTime = Instant.now(); var dur = Duration.between(initTime, endTime); if (total > 0) System.out.printf("Took %s, speed: %.2fHz%n", dur, total / (dur.toMillis() / 1000d));