Async migrate
This commit is contained in:
parent
9729704bb9
commit
7a8e3d6158
@ -12,10 +12,18 @@ import java.util.ArrayList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import org.apache.thrift.TApplicationException;
|
import org.apache.thrift.TApplicationException;
|
||||||
import org.apache.thrift.TException;
|
import org.apache.thrift.TException;
|
||||||
|
import org.apache.thrift.async.AsyncMethodCallback;
|
||||||
|
import org.apache.thrift.async.TAsyncClientManager;
|
||||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||||
|
import org.apache.thrift.transport.TNonblockingSocket;
|
||||||
|
import org.apache.thrift.transport.TNonblockingTransport;
|
||||||
import org.apache.thrift.transport.TSocket;
|
import org.apache.thrift.transport.TSocket;
|
||||||
|
import org.apache.thrift.transport.TTransportException;
|
||||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||||
import org.bson.BasicBSONEncoder;
|
import org.bson.BasicBSONEncoder;
|
||||||
import org.bson.BasicBSONObject;
|
import org.bson.BasicBSONObject;
|
||||||
@ -26,15 +34,12 @@ public class Migrate {
|
|||||||
public static void main(String[] args) throws TException, IOException {
|
public static void main(String[] args) throws TException, IOException {
|
||||||
|
|
||||||
// Tunables
|
// Tunables
|
||||||
String n = "401";
|
String n = "402";
|
||||||
var columnName = "peers_dtg-slave-" + n;
|
var columnName = "peers_dtg-slave-" + n;
|
||||||
var columnSchema = new ColumnSchema(List.of(Long.BYTES), List.of(), true);
|
var columnSchema = new ColumnSchema(List.of(Long.BYTES), List.of(), true);
|
||||||
//
|
//
|
||||||
|
|
||||||
var transport = new TFramedTransport(new TSocket("10.0.0.9", 5332));
|
var jo = Document.parse("{\"data\": " + Files.readString(Path.of("/home/cavallium/tmp_export/" + columnName + ".json")) + "}");
|
||||||
transport.open();
|
|
||||||
|
|
||||||
var jo = Document.parse("{\"data\": " + Files.readString(Path.of("/tmp/export/" + columnName + ".json")) + "}");
|
|
||||||
|
|
||||||
List<Document> documents = (List<Document>) jo.get("data");
|
List<Document> documents = (List<Document>) jo.get("data");
|
||||||
jo = null;
|
jo = null;
|
||||||
@ -61,26 +66,60 @@ public class Migrate {
|
|||||||
documents = null;
|
documents = null;
|
||||||
System.gc();
|
System.gc();
|
||||||
System.out.println("parsed documents");
|
System.out.println("parsed documents");
|
||||||
var protocol = new TBinaryProtocol(transport);
|
var protocol = new TBinaryProtocol.Factory();
|
||||||
var client = new RocksDB.Client(protocol);
|
var clients = ThreadLocal.withInitial(() -> {
|
||||||
long columnId = client.createColumn(columnName, columnSchema);
|
try {
|
||||||
var encoder = new BasicBSONEncoder();
|
return new RocksDB.AsyncClient(protocol, new TAsyncClientManager(), new TNonblockingSocket("10.0.0.9", 5332));
|
||||||
long nn = 0;
|
} catch (IOException | TTransportException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
var columnIdFuture = new CompletableFuture<Long>();
|
||||||
|
clients.get().createColumn(columnName, columnSchema, new AsyncMethodCallback<>() {
|
||||||
|
@Override
|
||||||
|
public void onComplete(Long response) {
|
||||||
|
columnIdFuture.complete(response);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Exception exception) {
|
||||||
|
columnIdFuture.completeExceptionally(exception);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
long columnId = columnIdFuture.join();
|
||||||
|
var encoder = ThreadLocal.withInitial(BasicBSONEncoder::new);
|
||||||
|
AtomicLong next = new AtomicLong();
|
||||||
long total = documentMap.size();
|
long total = documentMap.size();
|
||||||
var initTime = Instant.now();
|
var initTime = Instant.now();
|
||||||
for (Entry<Long, BasicBSONObject> longDocumentEntry : documentMap) {
|
documentMap.stream().parallel().forEach(longDocumentEntry -> {
|
||||||
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES);
|
ByteBuffer bb = ByteBuffer.allocate(Long.BYTES);
|
||||||
bb.asLongBuffer().put(longDocumentEntry.getKey());
|
bb.asLongBuffer().put(longDocumentEntry.getKey());
|
||||||
var valueArray = encoder.encode(longDocumentEntry.getValue());
|
var valueArray = encoder.get().encode(longDocumentEntry.getValue());
|
||||||
var valueBuf = ByteBuffer.wrap(valueArray);
|
var valueBuf = ByteBuffer.wrap(valueArray);
|
||||||
client.putFast(0, columnId, List.of(bb), valueBuf);
|
try {
|
||||||
|
var putFuture = new CompletableFuture<Void>();
|
||||||
|
clients.get().putFast(0, columnId, List.of(bb), valueBuf, new AsyncMethodCallback<>() {
|
||||||
|
@Override
|
||||||
|
public void onComplete(Void response) {
|
||||||
|
putFuture.complete(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Exception exception) {
|
||||||
|
putFuture.completeExceptionally(exception);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
putFuture.join();
|
||||||
|
} catch (TException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
var nn = next.incrementAndGet();
|
||||||
if (nn > 0 && nn % 10_000 == 0) {
|
if (nn > 0 && nn % 10_000 == 0) {
|
||||||
var endTime = Instant.now();
|
var endTime = Instant.now();
|
||||||
var dur = Duration.between(initTime, endTime);
|
var dur = Duration.between(initTime, endTime);
|
||||||
System.out.printf("Written %d/%d elements... %.2f, speed: %.2fHz%n", nn, total, ((nn * 100d) / total), nn / (dur.toMillis() / 1000d));
|
System.out.printf("Written %d/%d elements... %.2f, speed: %.2fHz%n", nn, total, ((nn * 100d) / total), nn / (dur.toMillis() / 1000d));
|
||||||
}
|
}
|
||||||
nn++;
|
});
|
||||||
}
|
|
||||||
var endTime = Instant.now();
|
var endTime = Instant.now();
|
||||||
var dur = Duration.between(initTime, endTime);
|
var dur = Duration.between(initTime, endTime);
|
||||||
if (total > 0) System.out.printf("Took %s, speed: %.2fHz%n", dur, total / (dur.toMillis() / 1000d));
|
if (total > 0) System.out.printf("Took %s, speed: %.2fHz%n", dur, total / (dur.toMillis() / 1000d));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user