feat(sync): sync feeds concurrently

This commit is contained in:
junkfood
2024-11-24 21:51:17 +08:00
parent 99eca4b109
commit 54cebbc9ca

View File

@@ -8,13 +8,17 @@ import androidx.work.ListenableWorker
import androidx.work.WorkManager
import com.rometools.rome.feed.synd.SyndFeed
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import me.ash.reader.domain.model.account.Account
import me.ash.reader.domain.model.article.ArticleWithFeed
import me.ash.reader.domain.model.feed.Feed
@@ -101,21 +105,18 @@ abstract class AbstractRssRepository(
val preTime = System.currentTimeMillis()
val preDate = Date(preTime)
val accountId = context.currentAccountId
feedDao.queryAll(accountId)
.chunked(16)
.forEach {
it.map { feed -> async { syncFeed(feed, preDate) } }
.awaitAll()
.forEach {
if (it.feed.isNotification) {
notificationHelper.notify(it.apply {
articles = articleDao.insertListIfNotExist(it.articles)
})
} else {
articleDao.insertListIfNotExist(it.articles)
}
val semaphore = Semaphore(16)
feedDao.queryAll(accountId).mapIndexed { _, feed ->
async(Dispatchers.IO) {
semaphore.withPermit {
val feedWithArticle = syncFeed(feed, preDate)
val newArticles = articleDao.insertListIfNotExist(feedWithArticle.articles)
if (feedWithArticle.feed.isNotification) {
notificationHelper.notify(feedWithArticle.copy(articles = newArticles))
}
}
}
}.awaitAll()
Log.i("RlOG", "onCompletion: ${System.currentTimeMillis() - preTime}")
accountDao.queryById(accountId)?.let { account ->