More tweaking to Rx pipeline

This commit is contained in:
topjohnwu 2019-08-01 23:08:58 -07:00
parent 9784353223
commit cb3f9b9740
2 changed files with 22 additions and 23 deletions

View File

@ -7,7 +7,7 @@ import com.topjohnwu.magisk.model.entity.module.Repo
@Dao
abstract class RepoDao {
val repoIDSet: Set<String> get() = getRepoID().map { it.id }.toSet()
val repoIDList get() = getRepoID().map { it.id }
val repos: List<Repo> get() = when (Config.repoOrder) {
Config.Value.ORDER_NAME -> getReposNameOrder()
@ -45,7 +45,7 @@ abstract class RepoDao {
abstract fun removeRepo(id: String)
@Query("DELETE FROM repos WHERE id IN (:idList)")
abstract fun removeRepos(idList: List<String>)
abstract fun removeRepos(idList: Collection<String>)
@Query("SELECT * FROM etag")
protected abstract fun etagRaw(): RepoEtag?

View File

@ -7,6 +7,7 @@ import com.topjohnwu.magisk.data.network.GithubApiServices
import com.topjohnwu.magisk.model.entity.module.Repo
import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.rxkotlin.toFlowable
import io.reactivex.schedulers.Schedulers
import se.ansman.kotshi.JsonSerializable
import timber.log.Timber
@ -21,16 +22,13 @@ class RepoUpdater(
) {
private fun loadRepos(repos: List<GithubRepoInfo>, cached: MutableSet<String>) =
Flowable.fromIterable(repos).map {
it.id to dateFormat.parse(it.pushed_at)!!
}.parallel().runOn(Schedulers.io()).map {
repos.toFlowable().parallel().runOn(Schedulers.io()).map {
// Skip submission
if (it.first == "submission")
if (it.id == "submission")
return@map
(repoDB.getRepo(it.first)?.apply {
cached.remove(it.first)
} ?: Repo(it.first)).runCatching {
update(it.second)
(repoDB.getRepo(it.id)?.apply { cached.remove(it.id) } ?:
Repo(it.id)).runCatching {
update(it.pushDate)
repoDB.addRepo(this)
}.getOrElse { Timber.e(it) }
}.sequential()
@ -57,8 +55,8 @@ class RepoUpdater(
}
}
private fun forcedReload(cached: MutableSet<String>) = Flowable.fromIterable(cached)
.parallel().runOn(Schedulers.io()).map {
private fun forcedReload(cached: MutableSet<String>) =
cached.toFlowable().parallel().runOn(Schedulers.io()).map {
runCatching {
Repo(it).update()
}.getOrElse { Timber.e(it) }
@ -67,9 +65,9 @@ class RepoUpdater(
private fun String.trimEtag() = substring(indexOf('\"'), lastIndexOf('\"') + 1)
operator fun invoke(forced: Boolean = false) : Single<Unit> {
val cached = Collections.synchronizedSet(HashSet(repoDB.repoIDSet))
val cached = Collections.synchronizedSet(HashSet(repoDB.repoIDList))
return loadPage(cached, etag = repoDB.etagKey).doOnComplete {
repoDB.removeRepos(cached.toList())
repoDB.removeRepos(cached)
}.onErrorResumeNext { it: Throwable ->
if (it is CachedException) {
if (forced)
@ -78,21 +76,22 @@ class RepoUpdater(
Timber.e(it)
}
Flowable.empty()
}.collect({}, {_, _ -> })
}
companion object {
private val dateFormat: SimpleDateFormat =
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).apply {
timeZone = TimeZone.getTimeZone("UTC")
}
}.ignoreElements().toSingleDefault(Unit)
}
object CachedException : Exception()
}
private val dateFormat: SimpleDateFormat =
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.US).apply {
timeZone = TimeZone.getTimeZone("UTC")
}
@JsonSerializable
data class GithubRepoInfo(
@Json(name = "name") val id: String,
val pushed_at: String
)
) {
@Transient
val pushDate = dateFormat.parse(pushed_at)!!
}