From e07b03dea375ec452d453f2d5be1889c9cfa0261 Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Wed, 24 Jul 2019 15:07:55 +0300 Subject: [PATCH] tdweb: experiments with storing incoming files into indexeddb GitOrigin-RevId: 7ab38676faaf5eb4e8b4c43b05c268d0ea2784de --- example/web/tdweb/src/worker.js | 134 ++++++++++++++++++++++++-- td/telegram/Global.h | 11 +++ td/telegram/Td.cpp | 5 + td/telegram/files/FileLoaderUtils.cpp | 2 +- 4 files changed, 144 insertions(+), 8 deletions(-) diff --git a/example/web/tdweb/src/worker.js b/example/web/tdweb/src/worker.js index 500e9a45..da95ead6 100644 --- a/example/web/tdweb/src/worker.js +++ b/example/web/tdweb/src/worker.js @@ -198,11 +198,25 @@ class InboundFileSystem { const start = performance.now(); try { const ifs = new InboundFileSystem(); + ifs.pending = []; + ifs.pendingHasTimeout = false; + ifs.persistCount = 0; + ifs.persistSize = 0; + ifs.pendingI = 0; + ifs.inPersist = false; + ifs.totalCount = 0; + ifs.root = root; - ifs.store = localforage.createInstance({ - name: dbName, - driver: localForageDrivers + //ifs.store = localforage.createInstance({ + //name: dbName, + //driver: localForageDrivers + //}); + log.debug('IDB name: ' + dbName); + ifs.idb = new Promise((resolve, reject) => { + const request = indexedDB.open(dbName); + request.onsuccess = () => resolve(request.result); + request.onerror = () => reject(request.error); }); ifs.load_pids(); @@ -221,7 +235,18 @@ class InboundFileSystem { async load_pids() { const keys_start = performance.now(); log.debug('InboundFileSystem::create::keys start'); - const keys = await this.store.keys(); + //const keys = await this.store.keys(); + + let idb = await this.idb; + let read = idb + .transaction(['keyvaluepairs'], 'readonly') + .objectStore('keyvaluepairs'); + const keys = await new Promise((resolve, reject) => { + const request = read.getAllKeys(); + request.onsuccess = () => resolve(request.result); + request.onerror = () => reject(request.error); + }); + const keys_time = (performance.now() - keys_start) / 1000; log.debug( 'InboundFileSystem::create::keys ' + keys_time + ' ' + keys.length @@ -243,9 +268,18 @@ class InboundFileSystem { } } - async persist(pid, path, arr) { + async doPersist(pid, path, arr, resolve, reject, write) { + this.persistCount++; + let size = arr.length; + this.persistSize += size; try { - await this.store.setItem(pid, new Blob([arr])); + //log.debug('persist.do start', pid, path, arr.length); + //await this.store.setItem(pid, new Blob([arr])); + await new Promise((resolve, reject) => { + const request = write.put(new Blob([arr]), pid); + request.onsuccess = () => resolve(request.result); + request.onerror = () => reject(request.error); + }); if (this.pids) { this.pids.add(pid); } @@ -253,12 +287,90 @@ class InboundFileSystem { } catch (e) { log.error('Failed persist ' + path + ' ', e); } + //log.debug('persist.do finish', pid, path, arr.length); + this.persistCount--; + this.persistSize -= size; + resolve(); + + this.tryFinishPersist(); } + + async flushPersist() { + if (this.inPersist) { + return; + } + log.debug('persist.flush'); + this.inPersist = true; + let idb = await this.idb; + this.writeBegin = performance.now(); + let write = idb + .transaction(['keyvaluepairs'], 'readwrite') + .objectStore('keyvaluepairs'); + while ( + this.pendingI < this.pending.length && + this.persistCount < 20 && + this.persistSize < 50 << 20 + ) { + var q = this.pending[this.pendingI]; + this.pending[this.pendingI] = null; + // TODO: add to transaction + this.doPersist(q.pid, q.path, q.arr, q.resolve, q.reject, write); + this.pendingI++; + this.totalCount++; + } + log.debug( + 'persist.flush transaction cnt=' + + this.persistCount + + ', size=' + + this.persistSize + ); + this.inPersist = false; + this.tryFinishPersist(); + } + + async tryFinishPersist() { + if (this.inPersist) { + return; + } + if (this.persistCount !== 0) { + return; + } + log.debug('persist.finish ' + (performance.now() - this.writeBegin) / 1000); + if (this.pendingI === this.pending.length) { + this.pending = []; + this.pendingHasTimeout = false; + this.pendingI = 0; + log.debug('persist.finish done'); + return; + } + log.debug('persist.finish continue'); + this.flushPersist(); + } + + async persist(pid, path, arr) { + if (!this.pendingHasTimeout) { + this.pendingHasTimeout = true; + log.debug('persist set timeout'); + setTimeout(() => { + this.flushPersist(); + }, 1); + } + await new Promise((resolve, reject) => { + this.pending.push({ + pid: pid, + path: path, + arr: arr, + resolve: resolve, + reject: reject + }); + }); + } + async unlink(pid) { log.debug('Unlink ' + pid); try { this.forget(pid); - await this.store.removeItem(pid); + //await this.store.removeItem(pid); } catch (e) { log.error('Failed unlink ' + pid + ' ', e); } @@ -551,6 +663,14 @@ class TdClient { this.client = this.td_functions.td_create(); this.savingFiles = new Map(); + this.send({ + '@type': 'setOption', + name: 'store_all_files_in_files_directory', + value: { + '@type': 'optionValueBoolean', + value: true + } + }); this.send({ '@type': 'setOption', name: 'language_pack_database_path', diff --git a/td/telegram/Global.h b/td/telegram/Global.h index b9236ffa..564dc8e2 100644 --- a/td/telegram/Global.h +++ b/td/telegram/Global.h @@ -86,6 +86,12 @@ class Global : public ActorContext { Slice get_dir() const { return parameters_.database_directory; } + Slice get_secure_files_dir() const { + if (store_all_files_in_files_directory_) { + return get_files_dir(); + } + return get_dir(); + } Slice get_files_dir() const { return parameters_.files_directory; } @@ -357,6 +363,10 @@ class Global : public ActorContext { void add_location_access_hash(double latitude, double longitude, int64 access_hash); + void set_store_all_files_in_files_directory(bool flag) { + store_all_files_in_files_directory_ = flag; + } + private: std::shared_ptr dh_config_; @@ -390,6 +400,7 @@ class Global : public ActorContext { int32 gc_scheduler_id_; int32 slow_net_scheduler_id_; + std::atomic store_all_files_in_files_directory_{false}; std::atomic server_time_difference_{0.0}; std::atomic server_time_difference_was_updated_{false}; std::atomic dns_time_difference_{0.0}; diff --git a/td/telegram/Td.cpp b/td/telegram/Td.cpp index 758ec4df..b870ebc5 100644 --- a/td/telegram/Td.cpp +++ b/td/telegram/Td.cpp @@ -6957,6 +6957,11 @@ void Td::on_request(uint64 id, td_api::setOption &request) { if (set_integer_option("storage_immunity_delay")) { return; } + if (set_boolean_option("store_all_files_in_files_directory")) { + bool value = static_cast(request.value_.get())->value_; + G()->set_store_all_files_in_files_directory(value); + return; + } break; case 'X': case 'x': { diff --git a/td/telegram/files/FileLoaderUtils.cpp b/td/telegram/files/FileLoaderUtils.cpp index d65af2a0..b39b301e 100644 --- a/td/telegram/files/FileLoaderUtils.cpp +++ b/td/telegram/files/FileLoaderUtils.cpp @@ -171,7 +171,7 @@ Result save_file_bytes(FileType type, BufferSlice bytes, static Slice get_file_base_dir(const FileDirType &file_dir_type) { switch (file_dir_type) { case FileDirType::Secure: - return G()->get_dir(); + return G()->get_secure_files_dir(); case FileDirType::Common: return G()->get_files_dir(); default: