From ecdb0b05b878676148f2c1c7c4e3377f22d8cec9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 30 Mar 2024 23:13:31 +0100 Subject: [PATCH] migrate --- .../it/cavallium/rockserver/core/Migrate.java | 132 ++++++++++-------- 1 file changed, 76 insertions(+), 56 deletions(-) diff --git a/src/main/java/it/cavallium/rockserver/core/Migrate.java b/src/main/java/it/cavallium/rockserver/core/Migrate.java index 4f8fdff..aeda198 100644 --- a/src/main/java/it/cavallium/rockserver/core/Migrate.java +++ b/src/main/java/it/cavallium/rockserver/core/Migrate.java @@ -4,64 +4,96 @@ import it.cavallium.rockserver.core.common.api.ColumnSchema; import it.cavallium.rockserver.core.common.api.RocksDB; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; +import java.util.HexFormat; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.LongAdder; -import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.async.TAsyncClientManager; import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TNonblockingSocket; -import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import org.apache.thrift.transport.layered.TFramedTransport; +import org.bson.BSONObject; +import org.bson.BasicBSONDecoder; import org.bson.BasicBSONEncoder; import org.bson.BasicBSONObject; import org.bson.Document; public class Migrate { - public static void main(String[] args) throws TException, IOException { + public static void main(String[] args) throws TException, IOException, InterruptedException { // Tunables - String n = "402"; - var columnName = "peers_dtg-slave-" + n; - var columnSchema = new ColumnSchema(List.of(Long.BYTES), List.of(), true); + var columnName = args[0]; + var password = args[1]; // - var jo = Document.parse("{\"data\": " + Files.readString(Path.of("/home/cavallium/tmp_export/" + columnName + ".json")) + "}"); + System.out.println("Column: " + columnName); + var temp = Files.createTempFile("temp-out-" + columnName + "-", ".json"); + + boolean peerMode = columnName.startsWith("peers_"); + var columnSchema = peerMode ? new ColumnSchema(List.of(Long.BYTES), List.of(), true) : new ColumnSchema(List.of(Byte.BYTES), List.of(), true); + var result = Runtime + .getRuntime() + .exec(new String[]{"psql", "--host", "home.cavallium.it", "-d", "sessions", "-U", "postgres", "-c", + "SELECT json_agg(t) FROM (SELECT * FROM \"" + columnName + "\") t", "-qAtX", + "--output=" + temp}, new String[] {"PGPASSWORD=" + password}); + result.waitFor(); + + var jo = Document.parse("{\"data\": " + Files.readString(temp) + "}"); + Files.delete(temp); List documents = (List) jo.get("data"); jo = null; System.gc(); System.out.println("Read json file"); var it = documents.iterator(); - List> documentMap = new ArrayList<>(); + List> documentMap = new ArrayList<>(); while (it.hasNext()) { var document = it.next(); var obj = new BasicBSONObject(); - var id = ((Number) document.get("id")).longValue(); - var accessHash = ((Number) document.get("access_hash")).longValue(); - var peerType = ((String) document.get("type")); - var username = ((String) document.get("username")); - var phoneNumber = ((String) document.get("phone_number")); - var lastUpdateOn = ((Number) document.get("last_update_on")).longValue(); - obj.put("access_hash", accessHash); - obj.put("peer_type", peerType); - obj.put("username", username); - obj.put("phone_number", phoneNumber); - obj.put("last_update_on", lastUpdateOn); - documentMap.add(Map.entry(id, obj)); + if (peerMode) { + var id = ((Number) document.get("id")).longValue(); + var accessHash = ((Number) document.get("access_hash")).longValue(); + var peerType = ((String) document.get("type")); + var username = ((String) document.get("username")); + var phoneNumber = ((String) document.get("phone_number")); + var lastUpdateOn = ((Number) document.get("last_update_on")).longValue(); + obj.put("access_hash", accessHash); + obj.put("peer_type", peerType); + obj.put("username", username); + obj.put("phone_number", phoneNumber); + obj.put("last_update_on", lastUpdateOn); + if (!peerType.equals("user")) { + documentMap.add(Map.entry(id, obj)); + } + } else { + byte id = 0; + Long dcId = ((Number) document.get("dc_id")).longValue(); + Long apiId = document.get("api_id") != null ? ((Number) document.get("api_id")).longValue() : null; + var testMode = ((Boolean) document.get("test_mode")); + var authKey = HexFormat.of().parseHex(((String) document.get("auth_key")).substring(2)); + var date = ((Number) document.get("date")).longValue(); + var userId = ((Number) document.get("user_id")).longValue(); + var isBot = ((Boolean) document.get("is_bot")); + var phone = ((String) document.get("phone")); + + obj.put("dc_id", dcId); + obj.put("api_id", apiId); + obj.put("test_mode", testMode); + obj.put("auth_key", authKey); + obj.put("date", date); + obj.put("user_id", userId); + obj.put("is_bot", isBot); + obj.put("phone", phone); + documentMap.add(Map.entry(id, obj)); + } } documents = null; System.gc(); @@ -69,47 +101,32 @@ public class Migrate { var protocol = new TBinaryProtocol.Factory(); var clients = ThreadLocal.withInitial(() -> { try { - return new RocksDB.AsyncClient(protocol, new TAsyncClientManager(), new TNonblockingSocket("10.0.0.9", 5332)); - } catch (IOException | TTransportException e) { + var socket = new TSocket("10.0.0.9", 5332); + var transport = new TFramedTransport(socket); + transport.open(); + + return new RocksDB.Client(new TBinaryProtocol(transport)); + } catch (TTransportException e) { throw new RuntimeException(e); } }); - var columnIdFuture = new CompletableFuture(); - clients.get().createColumn(columnName, columnSchema, new AsyncMethodCallback<>() { - @Override - public void onComplete(Long response) { - columnIdFuture.complete(response); - } - - @Override - public void onError(Exception exception) { - columnIdFuture.completeExceptionally(exception); - } - }); - long columnId = columnIdFuture.join(); + var columnId = clients.get().createColumn(columnName, columnSchema); var encoder = ThreadLocal.withInitial(BasicBSONEncoder::new); + var keyBBLocal = ThreadLocal.withInitial(() -> peerMode ? ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN) : ByteBuffer.allocate(1).order(ByteOrder.BIG_ENDIAN)); AtomicLong next = new AtomicLong(); long total = documentMap.size(); var initTime = Instant.now(); documentMap.stream().parallel().forEach(longDocumentEntry -> { - ByteBuffer bb = ByteBuffer.allocate(Long.BYTES); - bb.asLongBuffer().put(longDocumentEntry.getKey()); + ByteBuffer bb = keyBBLocal.get(); + if (peerMode) { + bb.asLongBuffer().put((Long) longDocumentEntry.getKey()); + } else { + bb.put((Byte) longDocumentEntry.getKey()).flip(); + } var valueArray = encoder.get().encode(longDocumentEntry.getValue()); var valueBuf = ByteBuffer.wrap(valueArray); try { - var putFuture = new CompletableFuture(); - clients.get().putFast(0, columnId, List.of(bb), valueBuf, new AsyncMethodCallback<>() { - @Override - public void onComplete(Void response) { - putFuture.complete(null); - } - - @Override - public void onError(Exception exception) { - putFuture.completeExceptionally(exception); - } - }); - putFuture.join(); + clients.get().putFast(0, columnId, List.of(bb), valueBuf); } catch (TException e) { throw new RuntimeException(e); } @@ -120,6 +137,9 @@ public class Migrate { System.out.printf("Written %d/%d elements... %.2f, speed: %.2fHz%n", nn, total, ((nn * 100d) / total), nn / (dur.toMillis() / 1000d)); } }); + if (!peerMode) { + System.out.println("Schema: " + new BasicBSONDecoder().readObject(clients.get().get(0, columnId, List.of(ByteBuffer.wrap(new byte[] {0}))).getValue())); + } var endTime = Instant.now(); var dur = Duration.between(initTime, endTime); if (total > 0) System.out.printf("Took %s, speed: %.2fHz%n", dur, total / (dur.toMillis() / 1000d));