Skip to content

Commit

Permalink
Fix OPML import progress not working for groups
Browse files Browse the repository at this point in the history
  • Loading branch information
msasikanth committed Jul 31, 2024
1 parent b18d23d commit c12c93d
Showing 1 changed file with 45 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ import kotlin.time.measureTime
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import me.tatarka.inject.annotations.Inject
Expand Down Expand Up @@ -174,36 +176,55 @@ class OpmlManager(
}

private fun addOpmlSources(sources: List<OpmlSource>): Flow<Int> = channelFlow {
val totalSourcesCount = sources.size
val feeds = sources.filterIsInstance<OpmlFeed>()
val groups = sources.filterIsInstance<OpmlFeedGroup>()
val totalSourcesCount = feeds.size + groups.flatMap { it.feeds }.size
val processedFeedsCount = AtomicInt(0)

if (sources.size > IMPORT_CHUNKS) {
sources.reversed().chunked(IMPORT_CHUNKS).forEach { sourcesInChunk ->
sourcesInChunk.map { source -> launch { createSourceInDB(source) } }.joinAll()
if (feeds.isNotEmpty()) {
addFeeds(feeds, processedFeedsCount, totalSourcesCount)
}

val size = processedFeedsCount.addAndGet(sourcesInChunk.size)
sendProgress(size, totalSourcesCount)
}
} else {
sources.reversed().forEachIndexed { index, source ->
launch { createSourceInDB(source) }.join()
if (groups.isNotEmpty()) {
// Since groups can contain multiple feeds, we don't want to add them in parallel
groups.forEach { group ->
val feedIds = addFeeds(group.feeds, processedFeedsCount, totalSourcesCount)
val groupId = rssRepository.createGroup(group.title)

sendProgress(index, totalSourcesCount)
rssRepository.addFeedIdsToGroups(groupIds = setOf(groupId), feedIds = feedIds)
}
}
}

private suspend fun createSourceInDB(source: OpmlSource) {
when (source) {
is OpmlFeed -> {
addFeed(source)
}
is OpmlFeedGroup -> {
val groupId = rssRepository.createGroup(source.title)
val feedIds = source.feeds.mapNotNull { feed -> addFeed(feed) }
private suspend fun ProducerScope<Int>.addFeeds(
feeds: List<OpmlFeed>,
processedFeedsCount: AtomicInt,
totalFeedsCount: Int,
): List<String> {
return coroutineScope {
val ids: List<String> =
feeds
.reversed()
.chunked(IMPORT_CHUNKS)
.map { sourcesInChunk ->
val ids =
sourcesInChunk
.map { feed ->
async {
addFeed(feed).also {
val progressIndex = processedFeedsCount.incrementAndGet()
send(calculateProgress(progressIndex, totalFeedsCount))
}
}
}
.awaitAll()
.filterNotNull()

rssRepository.addFeedIdsToGroups(groupIds = setOf(groupId), feedIds = feedIds)
}
return@map ids
}
.flatten()

return@coroutineScope ids
}
}

Expand All @@ -217,10 +238,8 @@ class OpmlManager(
return result.feedId
}

private suspend fun ProducerScope<Int>.sendProgress(progressIndex: Int, totalFeedCount: Int) {
// We are converting the total feed count to float
// so that we can get the precise progress like 0.1, 0.2..etc.,
send(((progressIndex / totalFeedCount.toFloat()) * 100).roundToInt())
private fun calculateProgress(progressIndex: Int, totalFeedCount: Int): Int {
return ((progressIndex / totalFeedCount.toFloat()) * 100).roundToInt()
}
}

Expand Down

0 comments on commit c12c93d

Please sign in to comment.