tdweb: experiments with storing incoming files into indexeddb
GitOrigin-RevId: 7ab38676faaf5eb4e8b4c43b05c268d0ea2784de
This commit is contained in:
parent
1726e10a8a
commit
e07b03dea3
@ -198,11 +198,25 @@ class InboundFileSystem {
|
|||||||
const start = performance.now();
|
const start = performance.now();
|
||||||
try {
|
try {
|
||||||
const ifs = new InboundFileSystem();
|
const ifs = new InboundFileSystem();
|
||||||
|
ifs.pending = [];
|
||||||
|
ifs.pendingHasTimeout = false;
|
||||||
|
ifs.persistCount = 0;
|
||||||
|
ifs.persistSize = 0;
|
||||||
|
ifs.pendingI = 0;
|
||||||
|
ifs.inPersist = false;
|
||||||
|
ifs.totalCount = 0;
|
||||||
|
|
||||||
ifs.root = root;
|
ifs.root = root;
|
||||||
|
|
||||||
ifs.store = localforage.createInstance({
|
//ifs.store = localforage.createInstance({
|
||||||
name: dbName,
|
//name: dbName,
|
||||||
driver: localForageDrivers
|
//driver: localForageDrivers
|
||||||
|
//});
|
||||||
|
log.debug('IDB name: ' + dbName);
|
||||||
|
ifs.idb = new Promise((resolve, reject) => {
|
||||||
|
const request = indexedDB.open(dbName);
|
||||||
|
request.onsuccess = () => resolve(request.result);
|
||||||
|
request.onerror = () => reject(request.error);
|
||||||
});
|
});
|
||||||
|
|
||||||
ifs.load_pids();
|
ifs.load_pids();
|
||||||
@ -221,7 +235,18 @@ class InboundFileSystem {
|
|||||||
async load_pids() {
|
async load_pids() {
|
||||||
const keys_start = performance.now();
|
const keys_start = performance.now();
|
||||||
log.debug('InboundFileSystem::create::keys start');
|
log.debug('InboundFileSystem::create::keys start');
|
||||||
const keys = await this.store.keys();
|
//const keys = await this.store.keys();
|
||||||
|
|
||||||
|
let idb = await this.idb;
|
||||||
|
let read = idb
|
||||||
|
.transaction(['keyvaluepairs'], 'readonly')
|
||||||
|
.objectStore('keyvaluepairs');
|
||||||
|
const keys = await new Promise((resolve, reject) => {
|
||||||
|
const request = read.getAllKeys();
|
||||||
|
request.onsuccess = () => resolve(request.result);
|
||||||
|
request.onerror = () => reject(request.error);
|
||||||
|
});
|
||||||
|
|
||||||
const keys_time = (performance.now() - keys_start) / 1000;
|
const keys_time = (performance.now() - keys_start) / 1000;
|
||||||
log.debug(
|
log.debug(
|
||||||
'InboundFileSystem::create::keys ' + keys_time + ' ' + keys.length
|
'InboundFileSystem::create::keys ' + keys_time + ' ' + keys.length
|
||||||
@ -243,9 +268,18 @@ class InboundFileSystem {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async persist(pid, path, arr) {
|
async doPersist(pid, path, arr, resolve, reject, write) {
|
||||||
|
this.persistCount++;
|
||||||
|
let size = arr.length;
|
||||||
|
this.persistSize += size;
|
||||||
try {
|
try {
|
||||||
await this.store.setItem(pid, new Blob([arr]));
|
//log.debug('persist.do start', pid, path, arr.length);
|
||||||
|
//await this.store.setItem(pid, new Blob([arr]));
|
||||||
|
await new Promise((resolve, reject) => {
|
||||||
|
const request = write.put(new Blob([arr]), pid);
|
||||||
|
request.onsuccess = () => resolve(request.result);
|
||||||
|
request.onerror = () => reject(request.error);
|
||||||
|
});
|
||||||
if (this.pids) {
|
if (this.pids) {
|
||||||
this.pids.add(pid);
|
this.pids.add(pid);
|
||||||
}
|
}
|
||||||
@ -253,12 +287,90 @@ class InboundFileSystem {
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
log.error('Failed persist ' + path + ' ', e);
|
log.error('Failed persist ' + path + ' ', e);
|
||||||
}
|
}
|
||||||
|
//log.debug('persist.do finish', pid, path, arr.length);
|
||||||
|
this.persistCount--;
|
||||||
|
this.persistSize -= size;
|
||||||
|
resolve();
|
||||||
|
|
||||||
|
this.tryFinishPersist();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async flushPersist() {
|
||||||
|
if (this.inPersist) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.debug('persist.flush');
|
||||||
|
this.inPersist = true;
|
||||||
|
let idb = await this.idb;
|
||||||
|
this.writeBegin = performance.now();
|
||||||
|
let write = idb
|
||||||
|
.transaction(['keyvaluepairs'], 'readwrite')
|
||||||
|
.objectStore('keyvaluepairs');
|
||||||
|
while (
|
||||||
|
this.pendingI < this.pending.length &&
|
||||||
|
this.persistCount < 20 &&
|
||||||
|
this.persistSize < 50 << 20
|
||||||
|
) {
|
||||||
|
var q = this.pending[this.pendingI];
|
||||||
|
this.pending[this.pendingI] = null;
|
||||||
|
// TODO: add to transaction
|
||||||
|
this.doPersist(q.pid, q.path, q.arr, q.resolve, q.reject, write);
|
||||||
|
this.pendingI++;
|
||||||
|
this.totalCount++;
|
||||||
|
}
|
||||||
|
log.debug(
|
||||||
|
'persist.flush transaction cnt=' +
|
||||||
|
this.persistCount +
|
||||||
|
', size=' +
|
||||||
|
this.persistSize
|
||||||
|
);
|
||||||
|
this.inPersist = false;
|
||||||
|
this.tryFinishPersist();
|
||||||
|
}
|
||||||
|
|
||||||
|
async tryFinishPersist() {
|
||||||
|
if (this.inPersist) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (this.persistCount !== 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.debug('persist.finish ' + (performance.now() - this.writeBegin) / 1000);
|
||||||
|
if (this.pendingI === this.pending.length) {
|
||||||
|
this.pending = [];
|
||||||
|
this.pendingHasTimeout = false;
|
||||||
|
this.pendingI = 0;
|
||||||
|
log.debug('persist.finish done');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.debug('persist.finish continue');
|
||||||
|
this.flushPersist();
|
||||||
|
}
|
||||||
|
|
||||||
|
async persist(pid, path, arr) {
|
||||||
|
if (!this.pendingHasTimeout) {
|
||||||
|
this.pendingHasTimeout = true;
|
||||||
|
log.debug('persist set timeout');
|
||||||
|
setTimeout(() => {
|
||||||
|
this.flushPersist();
|
||||||
|
}, 1);
|
||||||
|
}
|
||||||
|
await new Promise((resolve, reject) => {
|
||||||
|
this.pending.push({
|
||||||
|
pid: pid,
|
||||||
|
path: path,
|
||||||
|
arr: arr,
|
||||||
|
resolve: resolve,
|
||||||
|
reject: reject
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
async unlink(pid) {
|
async unlink(pid) {
|
||||||
log.debug('Unlink ' + pid);
|
log.debug('Unlink ' + pid);
|
||||||
try {
|
try {
|
||||||
this.forget(pid);
|
this.forget(pid);
|
||||||
await this.store.removeItem(pid);
|
//await this.store.removeItem(pid);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
log.error('Failed unlink ' + pid + ' ', e);
|
log.error('Failed unlink ' + pid + ' ', e);
|
||||||
}
|
}
|
||||||
@ -551,6 +663,14 @@ class TdClient {
|
|||||||
this.client = this.td_functions.td_create();
|
this.client = this.td_functions.td_create();
|
||||||
|
|
||||||
this.savingFiles = new Map();
|
this.savingFiles = new Map();
|
||||||
|
this.send({
|
||||||
|
'@type': 'setOption',
|
||||||
|
name: 'store_all_files_in_files_directory',
|
||||||
|
value: {
|
||||||
|
'@type': 'optionValueBoolean',
|
||||||
|
value: true
|
||||||
|
}
|
||||||
|
});
|
||||||
this.send({
|
this.send({
|
||||||
'@type': 'setOption',
|
'@type': 'setOption',
|
||||||
name: 'language_pack_database_path',
|
name: 'language_pack_database_path',
|
||||||
|
@ -86,6 +86,12 @@ class Global : public ActorContext {
|
|||||||
Slice get_dir() const {
|
Slice get_dir() const {
|
||||||
return parameters_.database_directory;
|
return parameters_.database_directory;
|
||||||
}
|
}
|
||||||
|
Slice get_secure_files_dir() const {
|
||||||
|
if (store_all_files_in_files_directory_) {
|
||||||
|
return get_files_dir();
|
||||||
|
}
|
||||||
|
return get_dir();
|
||||||
|
}
|
||||||
Slice get_files_dir() const {
|
Slice get_files_dir() const {
|
||||||
return parameters_.files_directory;
|
return parameters_.files_directory;
|
||||||
}
|
}
|
||||||
@ -357,6 +363,10 @@ class Global : public ActorContext {
|
|||||||
|
|
||||||
void add_location_access_hash(double latitude, double longitude, int64 access_hash);
|
void add_location_access_hash(double latitude, double longitude, int64 access_hash);
|
||||||
|
|
||||||
|
void set_store_all_files_in_files_directory(bool flag) {
|
||||||
|
store_all_files_in_files_directory_ = flag;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<DhConfig> dh_config_;
|
std::shared_ptr<DhConfig> dh_config_;
|
||||||
|
|
||||||
@ -390,6 +400,7 @@ class Global : public ActorContext {
|
|||||||
int32 gc_scheduler_id_;
|
int32 gc_scheduler_id_;
|
||||||
int32 slow_net_scheduler_id_;
|
int32 slow_net_scheduler_id_;
|
||||||
|
|
||||||
|
std::atomic<bool> store_all_files_in_files_directory_{false};
|
||||||
std::atomic<double> server_time_difference_{0.0};
|
std::atomic<double> server_time_difference_{0.0};
|
||||||
std::atomic<bool> server_time_difference_was_updated_{false};
|
std::atomic<bool> server_time_difference_was_updated_{false};
|
||||||
std::atomic<double> dns_time_difference_{0.0};
|
std::atomic<double> dns_time_difference_{0.0};
|
||||||
|
@ -6957,6 +6957,11 @@ void Td::on_request(uint64 id, td_api::setOption &request) {
|
|||||||
if (set_integer_option("storage_immunity_delay")) {
|
if (set_integer_option("storage_immunity_delay")) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
if (set_boolean_option("store_all_files_in_files_directory")) {
|
||||||
|
bool value = static_cast<const td_api::optionValueBoolean *>(request.value_.get())->value_;
|
||||||
|
G()->set_store_all_files_in_files_directory(value);
|
||||||
|
return;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case 'X':
|
case 'X':
|
||||||
case 'x': {
|
case 'x': {
|
||||||
|
@ -171,7 +171,7 @@ Result<FullLocalFileLocation> save_file_bytes(FileType type, BufferSlice bytes,
|
|||||||
static Slice get_file_base_dir(const FileDirType &file_dir_type) {
|
static Slice get_file_base_dir(const FileDirType &file_dir_type) {
|
||||||
switch (file_dir_type) {
|
switch (file_dir_type) {
|
||||||
case FileDirType::Secure:
|
case FileDirType::Secure:
|
||||||
return G()->get_dir();
|
return G()->get_secure_files_dir();
|
||||||
case FileDirType::Common:
|
case FileDirType::Common:
|
||||||
return G()->get_files_dir();
|
return G()->get_files_dir();
|
||||||
default:
|
default:
|
||||||
|
Loading…
Reference in New Issue
Block a user