Improve Rx pipeline

This commit is contained in:
topjohnwu 2019-07-28 14:49:06 -07:00
parent b2e6ba3c4a
commit c15f80b33f
2 changed files with 30 additions and 29 deletions

View File

@ -6,6 +6,7 @@ import com.topjohnwu.magisk.data.database.RepoDao
import com.topjohnwu.magisk.data.network.GithubApiServices
import com.topjohnwu.magisk.model.entity.module.Repo
import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.schedulers.Schedulers
import se.ansman.kotshi.JsonSerializable
import timber.log.Timber
@ -18,19 +19,11 @@ class RepoUpdater(
private val api: GithubApiServices,
private val repoDB: RepoDao
) {
private lateinit var cached: MutableSet<String>
private val dateFormat: SimpleDateFormat
get() {
val format = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US)
format.timeZone = TimeZone.getTimeZone("UTC")
return format
}
private fun loadRepos(repos: List<GithubRepoInfo>) = Flowable.fromIterable(repos)
.parallel().runOn(Schedulers.io()).map {
private fun loadRepos(repos: List<GithubRepoInfo>, cached: MutableSet<String>) =
Flowable.fromIterable(repos).map {
it.id to dateFormat.parse(it.pushed_at)!!
}.map {
}.parallel().runOn(Schedulers.io()).map {
// Skip submission
if (it.first == "submission")
return@map
@ -42,26 +35,29 @@ class RepoUpdater(
}.getOrElse { Timber.e(it) }
}.sequential()
private fun loadPage(page: Int, etag: String = ""): Flowable<Unit> =
api.fetchRepos(page, etag).flatMap {
it.error()?.also { throw it }
private fun loadPage(
cached: MutableSet<String>,
page: Int = 1,
etag: String = ""
): Flowable<Unit> = api.fetchRepos(page, etag).flatMap {
it.error()?.also { throw it }
it.response()?.run {
if (code() == HttpURLConnection.HTTP_NOT_MODIFIED)
throw CachedException()
return@run Flowable.error<Unit>(CachedException)
if (page == 1)
repoDB.etagKey = headers()[Const.Key.ETAG_KEY].orEmpty().trimEtag()
val flow = loadRepos(body()!!)
val flow = loadRepos(body()!!, cached)
if (headers()[Const.Key.LINK_KEY].orEmpty().contains("next")) {
flow.mergeWith(loadPage(page + 1))
flow.mergeWith(loadPage(cached, page + 1))
} else {
flow
}
}
}
private fun forcedReload() = Flowable.fromIterable(cached)
private fun forcedReload(cached: MutableSet<String>) = Flowable.fromIterable(cached)
.parallel().runOn(Schedulers.io()).map {
runCatching {
Repo(it).update()
@ -70,22 +66,29 @@ class RepoUpdater(
private fun String.trimEtag() = substring(indexOf('\"'), lastIndexOf('\"') + 1)
operator fun invoke(forced: Boolean = false) : Flowable<Unit> {
cached = Collections.synchronizedSet(HashSet(repoDB.repoIDSet))
return loadPage(1, repoDB.etagKey).doOnComplete {
operator fun invoke(forced: Boolean = false) : Single<Unit> {
val cached = Collections.synchronizedSet(HashSet(repoDB.repoIDSet))
return loadPage(cached, etag = repoDB.etagKey).doOnComplete {
repoDB.removeRepos(cached.toList())
cached.clear()
}.onErrorResumeNext { it: Throwable ->
cached.clear()
if (it is CachedException) {
if (forced) forcedReload() else Flowable.empty()
if (forced)
return@onErrorResumeNext forcedReload(cached)
} else {
Flowable.error(it)
Timber.e(it)
}
}
Flowable.empty()
}.collect({}, {_, _ -> })
}
class CachedException : Exception()
companion object {
private val dateFormat: SimpleDateFormat =
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).apply {
timeZone = TimeZone.getTimeZone("UTC")
}
}
object CachedException : Exception()
}
@JsonSerializable

View File

@ -63,9 +63,7 @@ class ModuleViewModel(
.toList()
.map { it to itemsInstalled.calculateDiff(it) }
.doOnSuccessUi { itemsInstalled.update(it.first, it.second) }
.toFlowable()
.flatMap { repoUpdater(force) }
.collect({}, {_, _ -> })
.flattenAsFlowable { repoDB.repos }
.map { RepoRvItem(it) }
.toList()