This commit is contained in:
junkfood
2025-09-02 23:04:51 +08:00
parent 8b3248d98d
commit 2fd9a0b21b
5 changed files with 210 additions and 191 deletions

View File

@@ -92,7 +92,11 @@ abstract class AbstractRssRepository(
}
}
abstract suspend fun sync(feedId: String?, groupId: String?): ListenableWorker.Result
abstract suspend fun sync(
accountId: Int,
feedId: String?,
groupId: String?
): ListenableWorker.Result
open suspend fun markAsRead(
groupId: String?,

View File

@@ -60,7 +60,9 @@ constructor(
fun getAccounts(): Flow<List<Account>> = accountDao.queryAllAsFlow()
fun getAccountById(accountId: Int): Flow<Account?> = accountDao.queryAccount(accountId)
fun getAccountFlowById(accountId: Int): Flow<Account?> = accountDao.queryAccount(accountId)
suspend fun getAccountById(accountId: Int): Account? = accountDao.queryById(accountId)
fun getCurrentAccount(): Account = runBlocking {
currentAccountFlow.first { it != null } as Account

View File

@@ -28,6 +28,7 @@ import kotlinx.coroutines.sync.withPermit
import me.ash.reader.R
import me.ash.reader.domain.data.SyncLogger
import me.ash.reader.domain.model.account.Account
import me.ash.reader.domain.model.account.AccountType
import me.ash.reader.domain.model.account.AccountType.Companion.FreshRSS
import me.ash.reader.domain.model.account.security.GoogleReaderSecurityKey
import me.ash.reader.domain.model.article.Article
@@ -219,11 +220,15 @@ constructor(
super.deleteFeed(feed, false)
}
override suspend fun sync(feedId: String?, groupId: String?): ListenableWorker.Result {
override suspend fun sync(
accountId: Int,
feedId: String?,
groupId: String?
): ListenableWorker.Result {
return if (feedId != null) {
syncFeed(feedId)
syncFeed(accountId, feedId)
} else {
sync()
sync(accountId)
}
}
@@ -247,14 +252,14 @@ constructor(
* @link https://github.com/theoldreader/api
*/
@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun sync(): ListenableWorker.Result = coroutineScope {
private suspend fun sync(accountId: Int): ListenableWorker.Result = coroutineScope {
val preTime = System.currentTimeMillis()
val preDate = Date(preTime)
try {
val accountId = accountService.getCurrentAccountId()
val account = accountService.getCurrentAccount()
val account = accountService.getAccountById(accountId)
requireNotNull(account) { "cannot find account" }
check(account.type == AccountType.GoogleReader || account.type == AccountType.FreshRSS) { "account type is invalid" }
val googleReaderAPI = getGoogleReaderAPI()
googleReaderAPI.refreshCredentialsIfNeeded()
val lastMonthAt =
@@ -281,12 +286,12 @@ constructor(
val isFreshRss = account.type.id == FreshRSS.id
val remoteReadIds = async {
fetchItemIdsAndContinue {
googleReaderAPI.getReadItemIds(
since = lastMonthAt,
continuationId = it,
useIt = isFreshRss,
)
}
googleReaderAPI.getReadItemIds(
since = lastMonthAt,
continuationId = it,
useIt = isFreshRss,
)
}
.map { it.shortId }
.toSet()
}
@@ -380,13 +385,13 @@ constructor(
val deferredList =
fetchItemsContentsDeferred(
itemIds = toBeSync.await(),
googleReaderAPI = googleReaderAPI,
accountId = accountId,
unreadIds = remoteUnreadIds.await(),
starredIds = remoteStarredIds.await(),
scope = this,
)
itemIds = toBeSync.await(),
googleReaderAPI = googleReaderAPI,
accountId = accountId,
unreadIds = remoteUnreadIds.await(),
starredIds = remoteStarredIds.await(),
scope = this,
)
.toMutableList()
val remoteGroups = async { groupWithFeedsMap.await().keys.toList() }
@@ -420,21 +425,21 @@ constructor(
if (deferredList.isNotEmpty()) {
launch {
whileSelect {
for (deferred in deferredList) {
deferred.onAwait {
articleDao.insertList(it)
articlesToNotify.addAll(
it.fastFilter {
it.isUnread && notificationFeedIds.contains(it.feedId)
}
)
deferredList.remove(deferred)
deferredList.isNotEmpty()
}
whileSelect {
for (deferred in deferredList) {
deferred.onAwait {
articleDao.insertList(it)
articlesToNotify.addAll(
it.fastFilter {
it.isUnread && notificationFeedIds.contains(it.feedId)
}
)
deferredList.remove(deferred)
deferredList.isNotEmpty()
}
}
}
}
.invokeOnCompletion {
launch {
articlesToNotify
@@ -486,112 +491,113 @@ constructor(
}
}
private suspend fun syncFeed(feedId: String): ListenableWorker.Result = supervisorScope {
val preTime = System.currentTimeMillis()
val account = accountService.getCurrentAccount()
requireNotNull(account) { "cannot find account" }
val accountId = account.id!!
val googleReaderAPI = getGoogleReaderAPI()
private suspend fun syncFeed(accountId: Int, feedId: String): ListenableWorker.Result =
supervisorScope {
val preTime = System.currentTimeMillis()
val account = accountService.getAccountById(accountId)
requireNotNull(account) { "cannot find account" }
check(account.type == AccountType.GoogleReader || account.type == AccountType.FreshRSS) { "account type is invalid" }
val googleReaderAPI = getGoogleReaderAPI()
val feed = feedDao.queryById(feedId)!!
val localStarredIds =
articleDao.queryMetadataByFeedId(accountId, feedId, isUnread = true).map {
it.id.remoteId
}
val localUnreadIds =
articleDao.queryMetadataByFeedId(accountId, feedId, isUnread = true).map {
it.id.remoteId
}
val localReadIds =
articleDao.queryMetadataByFeedId(accountId, feedId, isUnread = false).map {
it.id.remoteId
}
val feed = feedDao.queryById(feedId)!!
val localStarredIds =
articleDao.queryMetadataByFeedId(accountId, feedId, isUnread = true).map {
it.id.remoteId
}
val localUnreadIds =
articleDao.queryMetadataByFeedId(accountId, feedId, isUnread = true).map {
it.id.remoteId
}
val localReadIds =
articleDao.queryMetadataByFeedId(accountId, feedId, isUnread = false).map {
it.id.remoteId
}
val localIds = (localReadIds + localUnreadIds)
val localIds = (localReadIds + localUnreadIds)
val remoteUnreadIds = async {
fetchItemIdsAndContinue {
val remoteUnreadIds = async {
fetchItemIdsAndContinue {
googleReaderAPI.getItemIdsForFeed(
feedId = feedId.dollarLast(),
filterRead = true,
continuationId = it,
)
}
.map { it.shortId }
.toSet()
}
.map { it.shortId }
.toSet()
}
val remoteAllIds = async {
fetchItemIdsAndContinue {
val remoteAllIds = async {
fetchItemIdsAndContinue {
googleReaderAPI.getItemIdsForFeed(
feedId = feedId.dollarLast(),
filterRead = false,
continuationId = it,
)
}
.map { it.shortId }
.toSet()
.map { it.shortId }
.toSet()
}
val remoteStarredIds = async {
fetchItemIdsAndContinue { googleReaderAPI.getStarredItemIds(continuationId = it) }
.map { it.shortId }
.toSet()
}
val toFetch = remoteAllIds.await() - localIds
val items =
fetchItemsContents(
itemIds = toFetch,
googleReaderAPI = googleReaderAPI,
accountId = accountId,
unreadIds = remoteUnreadIds.await(),
starredIds = remoteStarredIds.await(),
)
if (feed.isNotification) {
val articlesToNotify = items.fastFilter { it.isUnread }
notificationHelper.notify(feed, articlesToNotify)
}
launch {
val remoteReadIds = remoteAllIds.await() - remoteUnreadIds.await()
val toBeReadIds = remoteReadIds.intersect(localUnreadIds)
toBeReadIds
.map { it.dbId(accountId) }
.chunked(1000)
.forEach {
articleDao.markAsReadByIdSet(
accountId = accountId,
ids = it.toSet(),
isUnread = false,
)
}
}
launch {
val toBeStarred = remoteStarredIds.await().intersect(localIds) - localStarredIds
toBeStarred
.map { it.dbId(accountId) }
.chunked(1000)
.forEach {
articleDao.markAsStarredByIdSet(
accountId = accountId,
ids = it.toSet(),
isStarred = true,
)
}
}
articleDao.insert(*items.toTypedArray())
Timber.i("onCompletion: ${System.currentTimeMillis() - preTime}")
ListenableWorker.Result.success()
}
val remoteStarredIds = async {
fetchItemIdsAndContinue { googleReaderAPI.getStarredItemIds(continuationId = it) }
.map { it.shortId }
.toSet()
}
val toFetch = remoteAllIds.await() - localIds
val items =
fetchItemsContents(
itemIds = toFetch,
googleReaderAPI = googleReaderAPI,
accountId = accountId,
unreadIds = remoteUnreadIds.await(),
starredIds = remoteStarredIds.await(),
)
if (feed.isNotification) {
val articlesToNotify = items.fastFilter { it.isUnread }
notificationHelper.notify(feed, articlesToNotify)
}
launch {
val remoteReadIds = remoteAllIds.await() - remoteUnreadIds.await()
val toBeReadIds = remoteReadIds.intersect(localUnreadIds)
toBeReadIds
.map { it.dbId(accountId) }
.chunked(1000)
.forEach {
articleDao.markAsReadByIdSet(
accountId = accountId,
ids = it.toSet(),
isUnread = false,
)
}
}
launch {
val toBeStarred = remoteStarredIds.await().intersect(localIds) - localStarredIds
toBeStarred
.map { it.dbId(accountId) }
.chunked(1000)
.forEach {
articleDao.markAsStarredByIdSet(
accountId = accountId,
ids = it.toSet(),
isStarred = true,
)
}
}
articleDao.insert(*items.toTypedArray())
Timber.i("onCompletion: ${System.currentTimeMillis() - preTime}")
ListenableWorker.Result.success()
}
private suspend fun fetchItemIdsAndContinue(
getItemIdsFunc: suspend (continuationId: String?) -> GoogleReaderDTO.ItemIds?
): MutableList<String> {
@@ -670,13 +676,13 @@ constructor(
starredIds: Set<String>,
): List<Article> = supervisorScope {
fetchItemsContentsDeferred(
itemIds = itemIds,
googleReaderAPI = googleReaderAPI,
accountId = accountId,
unreadIds = unreadIds,
starredIds = starredIds,
scope = this,
)
itemIds = itemIds,
googleReaderAPI = googleReaderAPI,
accountId = accountId,
unreadIds = unreadIds,
starredIds = starredIds,
scope = this,
)
.awaitAll()
.flatten()
}
@@ -700,28 +706,28 @@ constructor(
when {
groupId != null -> {
if (before == null) {
articleDao.queryMetadataByGroupIdWhenIsUnread(
accountId,
groupId,
!isUnread,
)
} else {
articleDao.queryMetadataByGroupIdWhenIsUnread(
accountId,
groupId,
!isUnread,
before,
)
}
articleDao.queryMetadataByGroupIdWhenIsUnread(
accountId,
groupId,
!isUnread,
)
} else {
articleDao.queryMetadataByGroupIdWhenIsUnread(
accountId,
groupId,
!isUnread,
before,
)
}
.map { it.id.dollarLast() }
}
feedId != null -> {
if (before == null) {
articleDao.queryMetadataByFeedId(accountId, feedId, !isUnread)
} else {
articleDao.queryMetadataByFeedId(accountId, feedId, !isUnread, before)
}
articleDao.queryMetadataByFeedId(accountId, feedId, !isUnread)
} else {
articleDao.queryMetadataByFeedId(accountId, feedId, !isUnread, before)
}
.map { it.id.dollarLast() }
}
@@ -731,10 +737,10 @@ constructor(
else -> {
if (before == null) {
articleDao.queryMetadataAll(accountId, !isUnread)
} else {
articleDao.queryMetadataAll(accountId, !isUnread, before)
}
articleDao.queryMetadataAll(accountId, !isUnread)
} else {
articleDao.queryMetadataAll(accountId, !isUnread, before)
}
.map { it.id.dollarLast() }
}
}

View File

@@ -14,6 +14,7 @@ import kotlinx.coroutines.supervisorScope
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import me.ash.reader.domain.data.SyncLogger
import me.ash.reader.domain.model.account.AccountType
import me.ash.reader.domain.model.feed.Feed
import me.ash.reader.domain.model.feed.FeedWithArticle
import me.ash.reader.domain.repository.ArticleDao
@@ -54,55 +55,61 @@ constructor(
accountService,
) {
override suspend fun sync(feedId: String?, groupId: String?) = supervisorScope {
override suspend fun sync(
accountId: Int,
feedId: String?,
groupId: String?
): ListenableWorker.Result = supervisorScope {
return@supervisorScope runCatching {
val preTime = System.currentTimeMillis()
val preDate = Date(preTime)
val currentAccount = accountService.getCurrentAccount()
val accountId = currentAccount.id!!
val semaphore = Semaphore(16)
val preTime = System.currentTimeMillis()
val preDate = Date(preTime)
val currentAccount = accountService.getAccountById(accountId)!!
require(currentAccount.type == AccountType.Local) {
"Account type is invalid"
}
val semaphore = Semaphore(16)
val feedsToSync =
when {
feedId != null -> listOfNotNull(feedDao.queryById(feedId))
groupId != null -> feedDao.queryByGroupId(accountId, groupId)
else -> feedDao.queryAll(accountId)
}
val feedsToSync =
when {
feedId != null -> listOfNotNull(feedDao.queryById(feedId))
groupId != null -> feedDao.queryByGroupId(accountId, groupId)
else -> feedDao.queryAll(accountId)
}
feedsToSync
.mapIndexed { _, currentFeed ->
async(Dispatchers.IO) {
semaphore.withPermit {
val archivedArticles =
feedDao
.queryArchivedArticles(currentFeed.id)
.map { it.link }
.toSet()
val fetchedFeed = syncFeed(currentFeed, preDate)
val fetchedArticles =
fetchedFeed.articles.filterNot {
archivedArticles.contains(it.link)
}
val newArticles =
articleDao.insertListIfNotExist(
articles = fetchedArticles,
feed = currentFeed,
)
if (currentFeed.isNotification && newArticles.isNotEmpty()) {
notificationHelper.notify(
fetchedFeed.copy(articles = newArticles, feed = currentFeed)
)
feedsToSync
.mapIndexed { _, currentFeed ->
async(Dispatchers.IO) {
semaphore.withPermit {
val archivedArticles =
feedDao
.queryArchivedArticles(currentFeed.id)
.map { it.link }
.toSet()
val fetchedFeed = syncFeed(currentFeed, preDate)
val fetchedArticles =
fetchedFeed.articles.filterNot {
archivedArticles.contains(it.link)
}
val newArticles =
articleDao.insertListIfNotExist(
articles = fetchedArticles,
feed = currentFeed,
)
if (currentFeed.isNotification && newArticles.isNotEmpty()) {
notificationHelper.notify(
fetchedFeed.copy(articles = newArticles, feed = currentFeed)
)
}
}
}
.awaitAll()
}
.awaitAll()
Timber.tag("RlOG").i("onCompletion: ${System.currentTimeMillis() - preTime}")
accountService.update(currentAccount.copy(updateAt = Date()))
ListenableWorker.Result.success()
}
Timber.tag("RlOG").i("onCompletion: ${System.currentTimeMillis() - preTime}")
accountService.update(currentAccount.copy(updateAt = Date()))
ListenableWorker.Result.success()
}
.onFailure { syncLogger.log(it) }
.getOrNull() ?: ListenableWorker.Result.retry()
}

View File

@@ -46,7 +46,7 @@ class AccountViewModel @Inject constructor(
fun initData(accountId: Int) {
viewModelScope.launch(ioDispatcher) {
_accountUiState.update { it.copy(selectedAccount = accountService.getAccountById(accountId)) }
_accountUiState.update { it.copy(selectedAccount = accountService.getAccountFlowById(accountId)) }
}
}