Load repos with coroutine

This commit is contained in:
topjohnwu 2020-07-07 01:57:56 -07:00
parent 820427e93b
commit 77032eced1
5 changed files with 100 additions and 93 deletions

View File

@ -38,15 +38,8 @@ data class Repo(
constructor(id: String) : this(id, "", "", "", -1, "", 0)
@Throws(IllegalRepoException::class)
fun update() {
val props = runCatching {
stringRepo.getMetadata(this).blockingGet()
.orEmpty().split("\\n".toRegex()).dropLastWhile { it.isEmpty() }
}.getOrElse {
throw IllegalRepoException("Repo [$id] module.prop download error: " + it.message)
}
props.runCatching {
private fun loadProps(props: String) {
props.split("\\n".toRegex()).dropLastWhile { it.isEmpty() }.runCatching {
parseProps(this)
}.onFailure {
throw IllegalRepoException("Repo [$id] parse error: " + it.message)
@ -58,9 +51,9 @@ data class Repo(
}
@Throws(IllegalRepoException::class)
fun update(lastUpdate: Date) {
last_update = lastUpdate.time
update()
suspend fun update(lastUpdate: Date? = null) {
lastUpdate?.let { last_update = it.time }
loadProps(stringRepo.getMetadata(this))
}
class IllegalRepoException(message: String) : Exception(message)

View File

@ -5,10 +5,7 @@ import com.topjohnwu.magisk.core.Const
import com.topjohnwu.magisk.core.model.module.Repo
import com.topjohnwu.magisk.data.database.RepoDao
import com.topjohnwu.magisk.data.network.GithubApiServices
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.rxkotlin.toFlowable
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.*
import timber.log.Timber
import java.net.HttpURLConnection
import java.text.SimpleDateFormat
@ -19,69 +16,83 @@ class RepoUpdater(
private val api: GithubApiServices,
private val repoDB: RepoDao
) {
private fun loadRepos(repos: List<GithubRepoInfo>, cached: MutableSet<String>) =
repos.toFlowable().parallel().runOn(Schedulers.io()).map {
// Skip submission
if (it.id == "submission")
return@map
val repo = repoDB.getRepo(it.id)?.apply { cached.remove(it.id) } ?: Repo(it.id)
repo.runCatching {
update(it.pushDate)
repoDB.addRepo(this)
}.getOrElse(Timber::e)
}.sequential()
private fun loadPage(
cached: MutableSet<String>,
page: Int = 1,
etag: String = ""
): Flowable<Unit> = api.fetchRepos(page, etag).flatMap {
it.error()?.also { throw it }
it.response()?.run {
if (code() == HttpURLConnection.HTTP_NOT_MODIFIED)
return@run Flowable.error<Unit>(CachedException())
private fun String.trimEtag() = substring(indexOf('\"'), lastIndexOf('\"') + 1)
if (page == 1)
repoDB.etagKey = headers()[Const.Key.ETAG_KEY].orEmpty().trimEtag()
val flow = loadRepos(body()!!, cached)
if (headers()[Const.Key.LINK_KEY].orEmpty().contains("next")) {
flow.mergeWith(loadPage(cached, page + 1))
} else {
flow
private suspend fun forcedReload(cached: MutableSet<String>) = coroutineScope {
cached.forEach {
launch {
val repo = Repo(it)
try {
repo.update()
repoDB.addRepo(repo)
} catch (e: Repo.IllegalRepoException) {
Timber.e(e)
}
}
}
}
private fun forcedReload(cached: MutableSet<String>) =
cached.toFlowable().parallel().runOn(Schedulers.io()).map {
runCatching {
Repo(it).update()
}.getOrElse(Timber::e)
}.sequential()
private fun String.trimEtag() = substring(indexOf('\"'), lastIndexOf('\"') + 1)
@Suppress("RedundantLambdaArrow")
operator fun invoke(forced: Boolean) : Completable {
return Flowable
.fromCallable { Collections.synchronizedSet(HashSet(repoDB.repoIDList)) }
.flatMap { cached ->
loadPage(cached, etag = repoDB.etagKey).doOnComplete {
repoDB.removeRepos(cached)
}.onErrorResumeNext { it: Throwable ->
if (it is CachedException) {
if (forced)
return@onErrorResumeNext forcedReload(cached)
} else {
Timber.e(it)
}
Flowable.empty()
private suspend fun loadRepos(
repos: List<GithubRepoInfo>,
cached: MutableSet<String>
) = coroutineScope {
repos.forEach {
// Skip submission
if (it.id == "submission")
return@forEach
launch {
val repo = repoDB.getRepo(it.id)?.apply { cached.remove(it.id) } ?: Repo(it.id)
try {
repo.update(it.pushDate)
repoDB.addRepo(repo)
} catch (e: Repo.IllegalRepoException) {
Timber.e(e)
}
}.ignoreElements()
}
}
}
class CachedException : Exception()
private enum class PageResult {
SUCCESS,
CACHED,
ERROR
}
private suspend fun loadPage(
cached: MutableSet<String>,
page: Int = 1,
etag: String = ""
): PageResult = coroutineScope {
val result = api.fetchRepos(page, etag)
result.run {
if (code() == HttpURLConnection.HTTP_NOT_MODIFIED)
return@coroutineScope PageResult.CACHED
if (!isSuccessful)
return@coroutineScope PageResult.ERROR
if (page == 1)
repoDB.etagKey = headers()[Const.Key.ETAG_KEY].orEmpty().trimEtag()
val repoLoad = async { loadRepos(body()!!, cached) }
val next = if (headers()[Const.Key.LINK_KEY].orEmpty().contains("next")) {
async { loadPage(cached, page + 1) }
} else {
async { PageResult.SUCCESS }
}
repoLoad.await()
return@coroutineScope next.await()
}
}
suspend operator fun invoke(forced: Boolean) = withContext(Dispatchers.IO) {
val cached = Collections.synchronizedSet(HashSet(repoDB.repoIDList))
when (loadPage(cached, etag = repoDB.etagKey)) {
PageResult.CACHED -> if (forced) forcedReload(cached)
PageResult.SUCCESS -> repoDB.removeRepos(cached)
}
}
}
private val dateFormat: SimpleDateFormat =

View File

@ -3,10 +3,9 @@ package com.topjohnwu.magisk.data.network
import com.topjohnwu.magisk.core.Const
import com.topjohnwu.magisk.core.model.UpdateInfo
import com.topjohnwu.magisk.core.tasks.GithubRepoInfo
import io.reactivex.Flowable
import io.reactivex.Single
import okhttp3.ResponseBody
import retrofit2.adapter.rxjava2.Result
import retrofit2.Response
import retrofit2.http.*
interface GithubRawServices {
@ -40,6 +39,9 @@ interface GithubRawServices {
@GET("$MAGISK_MODULES/{$MODULE}/master/{$FILE}")
fun fetchModuleInfo(@Path(MODULE) id: String, @Path(FILE) file: String): Single<String>
@GET("$MAGISK_MODULES/{$MODULE}/master/{$FILE}")
suspend fun fetchModuleFile(@Path(MODULE) id: String, @Path(FILE) file: String): String
//endregion
/**
@ -70,11 +72,11 @@ interface GithubRawServices {
interface GithubApiServices {
@GET("repos")
fun fetchRepos(
suspend fun fetchRepos(
@Query("page") page: Int,
@Header(Const.Key.IF_NONE_MATCH) etag: String,
@Query("sort") sort: String = "pushed",
@Query("per_page") count: Int = 100
): Flowable<Result<List<GithubRepoInfo>>>
): Response<List<GithubRepoInfo>>
}

View File

@ -9,7 +9,7 @@ class StringRepository(
fun getString(url: String) = api.fetchString(url)
fun getMetadata(repo: Repo) = api.fetchModuleInfo(repo.id, "module.prop")
suspend fun getMetadata(repo: Repo) = api.fetchModuleFile(repo.id, "module.prop")
fun getReadme(repo: Repo) = api.fetchModuleInfo(repo.id, "README.md")
}

View File

@ -4,6 +4,7 @@ import android.Manifest
import androidx.annotation.WorkerThread
import androidx.databinding.Bindable
import androidx.databinding.ObservableArrayList
import androidx.lifecycle.viewModelScope
import com.topjohnwu.magisk.BR
import com.topjohnwu.magisk.R
import com.topjohnwu.magisk.core.Config
@ -30,8 +31,10 @@ import io.reactivex.Single
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import me.tatarka.bindingcollectionadapter2.collections.MergeObservableList
import timber.log.Timber
import kotlin.math.roundToInt
/*
@ -204,30 +207,28 @@ class ModuleViewModel(
@Synchronized
fun loadRemote() {
// check for existing jobs
if (remoteJob?.isDisposed?.not() == true) {
if (isRemoteLoading)
return
}
if (itemsRemote.isEmpty()) {
EndlessRecyclerScrollListener.ResetState().publish()
}
fun loadRemoteDB(offset: Int) = Single
.fromCallable { dao.getRepos(offset) }
.map { it.map { RepoItem.Remote(it) } }
remoteJob = if (itemsRemote.isEmpty()) {
repoUpdater(refetch).andThen(loadRemoteDB(0))
} else {
loadRemoteDB(itemsRemote.size)
}.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { isRemoteLoading = true }
.doOnSuccess { isRemoteLoading = false }
.doOnError { isRemoteLoading = false }
.subscribeK(onError = Timber::e) {
itemsRemote.addAll(it)
viewModelScope.launch {
suspend fun loadRemoteDB(offset: Int) = withContext(Dispatchers.IO) {
dao.getRepos(offset).map { RepoItem.Remote(it) }
}
refetch = false
isRemoteLoading = true
val repos = if (itemsRemote.isEmpty()) {
repoUpdater(refetch)
loadRemoteDB(0)
} else {
loadRemoteDB(itemsRemote.size)
}
isRemoteLoading = false
itemsRemote.addAll(repos)
refetch = false
}
}
fun forceRefresh() {