diff --git a/app/src/main/java/com/topjohnwu/magisk/core/model/module/Repo.kt b/app/src/main/java/com/topjohnwu/magisk/core/model/module/Repo.kt index 2bd4f1aaf..d42d3c513 100644 --- a/app/src/main/java/com/topjohnwu/magisk/core/model/module/Repo.kt +++ b/app/src/main/java/com/topjohnwu/magisk/core/model/module/Repo.kt @@ -38,15 +38,8 @@ data class Repo( constructor(id: String) : this(id, "", "", "", -1, "", 0) @Throws(IllegalRepoException::class) - fun update() { - val props = runCatching { - stringRepo.getMetadata(this).blockingGet() - .orEmpty().split("\\n".toRegex()).dropLastWhile { it.isEmpty() } - }.getOrElse { - throw IllegalRepoException("Repo [$id] module.prop download error: " + it.message) - } - - props.runCatching { + private fun loadProps(props: String) { + props.split("\\n".toRegex()).dropLastWhile { it.isEmpty() }.runCatching { parseProps(this) }.onFailure { throw IllegalRepoException("Repo [$id] parse error: " + it.message) @@ -58,9 +51,9 @@ data class Repo( } @Throws(IllegalRepoException::class) - fun update(lastUpdate: Date) { - last_update = lastUpdate.time - update() + suspend fun update(lastUpdate: Date? = null) { + lastUpdate?.let { last_update = it.time } + loadProps(stringRepo.getMetadata(this)) } class IllegalRepoException(message: String) : Exception(message) diff --git a/app/src/main/java/com/topjohnwu/magisk/core/tasks/RepoUpdater.kt b/app/src/main/java/com/topjohnwu/magisk/core/tasks/RepoUpdater.kt index a71d54ff5..d2888165f 100644 --- a/app/src/main/java/com/topjohnwu/magisk/core/tasks/RepoUpdater.kt +++ b/app/src/main/java/com/topjohnwu/magisk/core/tasks/RepoUpdater.kt @@ -5,10 +5,7 @@ import com.topjohnwu.magisk.core.Const import com.topjohnwu.magisk.core.model.module.Repo import com.topjohnwu.magisk.data.database.RepoDao import com.topjohnwu.magisk.data.network.GithubApiServices -import io.reactivex.Completable -import io.reactivex.Flowable -import io.reactivex.rxkotlin.toFlowable -import io.reactivex.schedulers.Schedulers +import kotlinx.coroutines.* import timber.log.Timber import java.net.HttpURLConnection import java.text.SimpleDateFormat @@ -19,69 +16,83 @@ class RepoUpdater( private val api: GithubApiServices, private val repoDB: RepoDao ) { - private fun loadRepos(repos: List, cached: MutableSet) = - repos.toFlowable().parallel().runOn(Schedulers.io()).map { - // Skip submission - if (it.id == "submission") - return@map - val repo = repoDB.getRepo(it.id)?.apply { cached.remove(it.id) } ?: Repo(it.id) - repo.runCatching { - update(it.pushDate) - repoDB.addRepo(this) - }.getOrElse(Timber::e) - }.sequential() - private fun loadPage( - cached: MutableSet, - page: Int = 1, - etag: String = "" - ): Flowable = api.fetchRepos(page, etag).flatMap { - it.error()?.also { throw it } - it.response()?.run { - if (code() == HttpURLConnection.HTTP_NOT_MODIFIED) - return@run Flowable.error(CachedException()) + private fun String.trimEtag() = substring(indexOf('\"'), lastIndexOf('\"') + 1) - if (page == 1) - repoDB.etagKey = headers()[Const.Key.ETAG_KEY].orEmpty().trimEtag() - - val flow = loadRepos(body()!!, cached) - if (headers()[Const.Key.LINK_KEY].orEmpty().contains("next")) { - flow.mergeWith(loadPage(cached, page + 1)) - } else { - flow + private suspend fun forcedReload(cached: MutableSet) = coroutineScope { + cached.forEach { + launch { + val repo = Repo(it) + try { + repo.update() + repoDB.addRepo(repo) + } catch (e: Repo.IllegalRepoException) { + Timber.e(e) + } } } } - private fun forcedReload(cached: MutableSet) = - cached.toFlowable().parallel().runOn(Schedulers.io()).map { - runCatching { - Repo(it).update() - }.getOrElse(Timber::e) - }.sequential() - - private fun String.trimEtag() = substring(indexOf('\"'), lastIndexOf('\"') + 1) - - @Suppress("RedundantLambdaArrow") - operator fun invoke(forced: Boolean) : Completable { - return Flowable - .fromCallable { Collections.synchronizedSet(HashSet(repoDB.repoIDList)) } - .flatMap { cached -> - loadPage(cached, etag = repoDB.etagKey).doOnComplete { - repoDB.removeRepos(cached) - }.onErrorResumeNext { it: Throwable -> - if (it is CachedException) { - if (forced) - return@onErrorResumeNext forcedReload(cached) - } else { - Timber.e(it) - } - Flowable.empty() + private suspend fun loadRepos( + repos: List, + cached: MutableSet + ) = coroutineScope { + repos.forEach { + // Skip submission + if (it.id == "submission") + return@forEach + launch { + val repo = repoDB.getRepo(it.id)?.apply { cached.remove(it.id) } ?: Repo(it.id) + try { + repo.update(it.pushDate) + repoDB.addRepo(repo) + } catch (e: Repo.IllegalRepoException) { + Timber.e(e) } - }.ignoreElements() + } + } } - class CachedException : Exception() + private enum class PageResult { + SUCCESS, + CACHED, + ERROR + } + + private suspend fun loadPage( + cached: MutableSet, + page: Int = 1, + etag: String = "" + ): PageResult = coroutineScope { + val result = api.fetchRepos(page, etag) + result.run { + if (code() == HttpURLConnection.HTTP_NOT_MODIFIED) + return@coroutineScope PageResult.CACHED + + if (!isSuccessful) + return@coroutineScope PageResult.ERROR + + if (page == 1) + repoDB.etagKey = headers()[Const.Key.ETAG_KEY].orEmpty().trimEtag() + + val repoLoad = async { loadRepos(body()!!, cached) } + val next = if (headers()[Const.Key.LINK_KEY].orEmpty().contains("next")) { + async { loadPage(cached, page + 1) } + } else { + async { PageResult.SUCCESS } + } + repoLoad.await() + return@coroutineScope next.await() + } + } + + suspend operator fun invoke(forced: Boolean) = withContext(Dispatchers.IO) { + val cached = Collections.synchronizedSet(HashSet(repoDB.repoIDList)) + when (loadPage(cached, etag = repoDB.etagKey)) { + PageResult.CACHED -> if (forced) forcedReload(cached) + PageResult.SUCCESS -> repoDB.removeRepos(cached) + } + } } private val dateFormat: SimpleDateFormat = diff --git a/app/src/main/java/com/topjohnwu/magisk/data/network/GithubServices.kt b/app/src/main/java/com/topjohnwu/magisk/data/network/GithubServices.kt index 86100e2c1..58e4626c4 100644 --- a/app/src/main/java/com/topjohnwu/magisk/data/network/GithubServices.kt +++ b/app/src/main/java/com/topjohnwu/magisk/data/network/GithubServices.kt @@ -3,10 +3,9 @@ package com.topjohnwu.magisk.data.network import com.topjohnwu.magisk.core.Const import com.topjohnwu.magisk.core.model.UpdateInfo import com.topjohnwu.magisk.core.tasks.GithubRepoInfo -import io.reactivex.Flowable import io.reactivex.Single import okhttp3.ResponseBody -import retrofit2.adapter.rxjava2.Result +import retrofit2.Response import retrofit2.http.* interface GithubRawServices { @@ -40,6 +39,9 @@ interface GithubRawServices { @GET("$MAGISK_MODULES/{$MODULE}/master/{$FILE}") fun fetchModuleInfo(@Path(MODULE) id: String, @Path(FILE) file: String): Single + @GET("$MAGISK_MODULES/{$MODULE}/master/{$FILE}") + suspend fun fetchModuleFile(@Path(MODULE) id: String, @Path(FILE) file: String): String + //endregion /** @@ -70,11 +72,11 @@ interface GithubRawServices { interface GithubApiServices { @GET("repos") - fun fetchRepos( + suspend fun fetchRepos( @Query("page") page: Int, @Header(Const.Key.IF_NONE_MATCH) etag: String, @Query("sort") sort: String = "pushed", @Query("per_page") count: Int = 100 - ): Flowable>> + ): Response> } diff --git a/app/src/main/java/com/topjohnwu/magisk/data/repository/StringRepository.kt b/app/src/main/java/com/topjohnwu/magisk/data/repository/StringRepository.kt index 74806db6c..fb807fe5b 100644 --- a/app/src/main/java/com/topjohnwu/magisk/data/repository/StringRepository.kt +++ b/app/src/main/java/com/topjohnwu/magisk/data/repository/StringRepository.kt @@ -9,7 +9,7 @@ class StringRepository( fun getString(url: String) = api.fetchString(url) - fun getMetadata(repo: Repo) = api.fetchModuleInfo(repo.id, "module.prop") + suspend fun getMetadata(repo: Repo) = api.fetchModuleFile(repo.id, "module.prop") fun getReadme(repo: Repo) = api.fetchModuleInfo(repo.id, "README.md") } diff --git a/app/src/main/java/com/topjohnwu/magisk/ui/module/ModuleViewModel.kt b/app/src/main/java/com/topjohnwu/magisk/ui/module/ModuleViewModel.kt index 2b11492c0..c7ec95935 100644 --- a/app/src/main/java/com/topjohnwu/magisk/ui/module/ModuleViewModel.kt +++ b/app/src/main/java/com/topjohnwu/magisk/ui/module/ModuleViewModel.kt @@ -4,6 +4,7 @@ import android.Manifest import androidx.annotation.WorkerThread import androidx.databinding.Bindable import androidx.databinding.ObservableArrayList +import androidx.lifecycle.viewModelScope import com.topjohnwu.magisk.BR import com.topjohnwu.magisk.R import com.topjohnwu.magisk.core.Config @@ -30,8 +31,10 @@ import io.reactivex.Single import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.disposables.Disposable import io.reactivex.schedulers.Schedulers +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import me.tatarka.bindingcollectionadapter2.collections.MergeObservableList -import timber.log.Timber import kotlin.math.roundToInt /* @@ -204,30 +207,28 @@ class ModuleViewModel( @Synchronized fun loadRemote() { // check for existing jobs - if (remoteJob?.isDisposed?.not() == true) { + if (isRemoteLoading) return - } if (itemsRemote.isEmpty()) { EndlessRecyclerScrollListener.ResetState().publish() } - fun loadRemoteDB(offset: Int) = Single - .fromCallable { dao.getRepos(offset) } - .map { it.map { RepoItem.Remote(it) } } - - remoteJob = if (itemsRemote.isEmpty()) { - repoUpdater(refetch).andThen(loadRemoteDB(0)) - } else { - loadRemoteDB(itemsRemote.size) - }.observeOn(AndroidSchedulers.mainThread()) - .doOnSubscribe { isRemoteLoading = true } - .doOnSuccess { isRemoteLoading = false } - .doOnError { isRemoteLoading = false } - .subscribeK(onError = Timber::e) { - itemsRemote.addAll(it) + viewModelScope.launch { + suspend fun loadRemoteDB(offset: Int) = withContext(Dispatchers.IO) { + dao.getRepos(offset).map { RepoItem.Remote(it) } } - refetch = false + isRemoteLoading = true + val repos = if (itemsRemote.isEmpty()) { + repoUpdater(refetch) + loadRemoteDB(0) + } else { + loadRemoteDB(itemsRemote.size) + } + isRemoteLoading = false + itemsRemote.addAll(repos) + refetch = false + } } fun forceRefresh() {