migrate
This commit is contained in:
parent
7a8e3d6158
commit
ecdb0b05b8
@ -4,64 +4,96 @@ import it.cavallium.rockserver.core.common.api.ColumnSchema;
|
||||
import it.cavallium.rockserver.core.common.api.RocksDB;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HexFormat;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import org.apache.thrift.TApplicationException;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.async.AsyncMethodCallback;
|
||||
import org.apache.thrift.async.TAsyncClientManager;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.transport.TNonblockingSocket;
|
||||
import org.apache.thrift.transport.TNonblockingTransport;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.bson.BSONObject;
|
||||
import org.bson.BasicBSONDecoder;
|
||||
import org.bson.BasicBSONEncoder;
|
||||
import org.bson.BasicBSONObject;
|
||||
import org.bson.Document;
|
||||
|
||||
public class Migrate {
|
||||
|
||||
public static void main(String[] args) throws TException, IOException {
|
||||
public static void main(String[] args) throws TException, IOException, InterruptedException {
|
||||
|
||||
// Tunables
|
||||
String n = "402";
|
||||
var columnName = "peers_dtg-slave-" + n;
|
||||
var columnSchema = new ColumnSchema(List.of(Long.BYTES), List.of(), true);
|
||||
var columnName = args[0];
|
||||
var password = args[1];
|
||||
//
|
||||
|
||||
var jo = Document.parse("{\"data\": " + Files.readString(Path.of("/home/cavallium/tmp_export/" + columnName + ".json")) + "}");
|
||||
System.out.println("Column: " + columnName);
|
||||
var temp = Files.createTempFile("temp-out-" + columnName + "-", ".json");
|
||||
|
||||
boolean peerMode = columnName.startsWith("peers_");
|
||||
var columnSchema = peerMode ? new ColumnSchema(List.of(Long.BYTES), List.of(), true) : new ColumnSchema(List.of(Byte.BYTES), List.of(), true);
|
||||
var result = Runtime
|
||||
.getRuntime()
|
||||
.exec(new String[]{"psql", "--host", "home.cavallium.it", "-d", "sessions", "-U", "postgres", "-c",
|
||||
"SELECT json_agg(t) FROM (SELECT * FROM \"" + columnName + "\") t", "-qAtX",
|
||||
"--output=" + temp}, new String[] {"PGPASSWORD=" + password});
|
||||
result.waitFor();
|
||||
|
||||
var jo = Document.parse("{\"data\": " + Files.readString(temp) + "}");
|
||||
Files.delete(temp);
|
||||
|
||||
List<Document> documents = (List<Document>) jo.get("data");
|
||||
jo = null;
|
||||
System.gc();
|
||||
System.out.println("Read json file");
|
||||
var it = documents.iterator();
|
||||
List<Map.Entry<Long, BasicBSONObject>> documentMap = new ArrayList<>();
|
||||
List<Map.Entry<Object, BasicBSONObject>> documentMap = new ArrayList<>();
|
||||
while (it.hasNext()) {
|
||||
var document = it.next();
|
||||
var obj = new BasicBSONObject();
|
||||
var id = ((Number) document.get("id")).longValue();
|
||||
var accessHash = ((Number) document.get("access_hash")).longValue();
|
||||
var peerType = ((String) document.get("type"));
|
||||
var username = ((String) document.get("username"));
|
||||
var phoneNumber = ((String) document.get("phone_number"));
|
||||
var lastUpdateOn = ((Number) document.get("last_update_on")).longValue();
|
||||
obj.put("access_hash", accessHash);
|
||||
obj.put("peer_type", peerType);
|
||||
obj.put("username", username);
|
||||
obj.put("phone_number", phoneNumber);
|
||||
obj.put("last_update_on", lastUpdateOn);
|
||||
documentMap.add(Map.entry(id, obj));
|
||||
if (peerMode) {
|
||||
var id = ((Number) document.get("id")).longValue();
|
||||
var accessHash = ((Number) document.get("access_hash")).longValue();
|
||||
var peerType = ((String) document.get("type"));
|
||||
var username = ((String) document.get("username"));
|
||||
var phoneNumber = ((String) document.get("phone_number"));
|
||||
var lastUpdateOn = ((Number) document.get("last_update_on")).longValue();
|
||||
obj.put("access_hash", accessHash);
|
||||
obj.put("peer_type", peerType);
|
||||
obj.put("username", username);
|
||||
obj.put("phone_number", phoneNumber);
|
||||
obj.put("last_update_on", lastUpdateOn);
|
||||
if (!peerType.equals("user")) {
|
||||
documentMap.add(Map.entry(id, obj));
|
||||
}
|
||||
} else {
|
||||
byte id = 0;
|
||||
Long dcId = ((Number) document.get("dc_id")).longValue();
|
||||
Long apiId = document.get("api_id") != null ? ((Number) document.get("api_id")).longValue() : null;
|
||||
var testMode = ((Boolean) document.get("test_mode"));
|
||||
var authKey = HexFormat.of().parseHex(((String) document.get("auth_key")).substring(2));
|
||||
var date = ((Number) document.get("date")).longValue();
|
||||
var userId = ((Number) document.get("user_id")).longValue();
|
||||
var isBot = ((Boolean) document.get("is_bot"));
|
||||
var phone = ((String) document.get("phone"));
|
||||
|
||||
obj.put("dc_id", dcId);
|
||||
obj.put("api_id", apiId);
|
||||
obj.put("test_mode", testMode);
|
||||
obj.put("auth_key", authKey);
|
||||
obj.put("date", date);
|
||||
obj.put("user_id", userId);
|
||||
obj.put("is_bot", isBot);
|
||||
obj.put("phone", phone);
|
||||
documentMap.add(Map.entry(id, obj));
|
||||
}
|
||||
}
|
||||
documents = null;
|
||||
System.gc();
|
||||
@ -69,47 +101,32 @@ public class Migrate {
|
||||
var protocol = new TBinaryProtocol.Factory();
|
||||
var clients = ThreadLocal.withInitial(() -> {
|
||||
try {
|
||||
return new RocksDB.AsyncClient(protocol, new TAsyncClientManager(), new TNonblockingSocket("10.0.0.9", 5332));
|
||||
} catch (IOException | TTransportException e) {
|
||||
var socket = new TSocket("10.0.0.9", 5332);
|
||||
var transport = new TFramedTransport(socket);
|
||||
transport.open();
|
||||
|
||||
return new RocksDB.Client(new TBinaryProtocol(transport));
|
||||
} catch (TTransportException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
var columnIdFuture = new CompletableFuture<Long>();
|
||||
clients.get().createColumn(columnName, columnSchema, new AsyncMethodCallback<>() {
|
||||
@Override
|
||||
public void onComplete(Long response) {
|
||||
columnIdFuture.complete(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Exception exception) {
|
||||
columnIdFuture.completeExceptionally(exception);
|
||||
}
|
||||
});
|
||||
long columnId = columnIdFuture.join();
|
||||
var columnId = clients.get().createColumn(columnName, columnSchema);
|
||||
var encoder = ThreadLocal.withInitial(BasicBSONEncoder::new);
|
||||
var keyBBLocal = ThreadLocal.withInitial(() -> peerMode ? ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN) : ByteBuffer.allocate(1).order(ByteOrder.BIG_ENDIAN));
|
||||
AtomicLong next = new AtomicLong();
|
||||
long total = documentMap.size();
|
||||
var initTime = Instant.now();
|
||||
documentMap.stream().parallel().forEach(longDocumentEntry -> {
|
||||
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES);
|
||||
bb.asLongBuffer().put(longDocumentEntry.getKey());
|
||||
ByteBuffer bb = keyBBLocal.get();
|
||||
if (peerMode) {
|
||||
bb.asLongBuffer().put((Long) longDocumentEntry.getKey());
|
||||
} else {
|
||||
bb.put((Byte) longDocumentEntry.getKey()).flip();
|
||||
}
|
||||
var valueArray = encoder.get().encode(longDocumentEntry.getValue());
|
||||
var valueBuf = ByteBuffer.wrap(valueArray);
|
||||
try {
|
||||
var putFuture = new CompletableFuture<Void>();
|
||||
clients.get().putFast(0, columnId, List.of(bb), valueBuf, new AsyncMethodCallback<>() {
|
||||
@Override
|
||||
public void onComplete(Void response) {
|
||||
putFuture.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Exception exception) {
|
||||
putFuture.completeExceptionally(exception);
|
||||
}
|
||||
});
|
||||
putFuture.join();
|
||||
clients.get().putFast(0, columnId, List.of(bb), valueBuf);
|
||||
} catch (TException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -120,6 +137,9 @@ public class Migrate {
|
||||
System.out.printf("Written %d/%d elements... %.2f, speed: %.2fHz%n", nn, total, ((nn * 100d) / total), nn / (dur.toMillis() / 1000d));
|
||||
}
|
||||
});
|
||||
if (!peerMode) {
|
||||
System.out.println("Schema: " + new BasicBSONDecoder().readObject(clients.get().get(0, columnId, List.of(ByteBuffer.wrap(new byte[] {0}))).getValue()));
|
||||
}
|
||||
var endTime = Instant.now();
|
||||
var dur = Duration.between(initTime, endTime);
|
||||
if (total > 0) System.out.printf("Took %s, speed: %.2fHz%n", dur, total / (dur.toMillis() / 1000d));
|
||||
|
Loading…
x
Reference in New Issue
Block a user