Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
ndegwamartin committed Sep 24, 2024
1 parent 5f8dae9 commit 9d8ca72
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ import com.google.common.truth.Truth.assertThat
import java.math.BigDecimal
import java.time.Instant
import java.util.Date
import junit.framework.TestCase.assertTrue
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
Expand Down Expand Up @@ -5172,24 +5169,6 @@ class DatabaseImplTest {
assertThat(localChangeResourceReferences.size).isEqualTo(locallyCreatedPatients.size)
}

@Test
fun `pmap_shouldExecuteMappingTasksInParallel`() = runBlocking {
val numberList = listOf(2, 3)
var squaredNumberList: List<Int>

val timeTaken = measureTimeMillis {
squaredNumberList =
numberList.pmap {
delay(1000L)
it * 2
}
}
assertTrue(squaredNumberList.isNotEmpty())
assertThat(squaredNumberList.first()).isEqualTo(4)
assertThat(squaredNumberList.last()).isEqualTo(6)
assertTrue(timeTaken < 2000)
}

private companion object {
const val mockEpochTimeStamp = 1628516301000
const val TEST_PATIENT_1_ID = "test_patient_1"
Expand Down
9 changes: 9 additions & 0 deletions engine/src/main/java/com/google/android/fhir/Util.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.Date
import java.util.Locale
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import org.hl7.fhir.r4.model.OperationOutcome
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
Expand Down Expand Up @@ -69,6 +73,11 @@ internal fun Resource.isUploadSuccess(): Boolean {
outcome.issue.all { it.severity.equals(OperationOutcome.IssueSeverity.INFORMATION) }
}

/** Implementation of a parallelized map for CPU intensive tasks */
suspend fun <A, B> Iterable<A>.pmapCPU(f: suspend (A) -> B): List<B> = coroutineScope {
map { async(Dispatchers.Default) { f(it) } }.awaitAll()
}

internal class OffsetDateTimeTypeAdapter : TypeAdapter<OffsetDateTime>() {
override fun write(out: JsonWriter, value: OffsetDateTime) {
out.value(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,12 @@ import com.google.android.fhir.db.impl.entities.LocalChangeEntity
import com.google.android.fhir.db.impl.entities.ResourceEntity
import com.google.android.fhir.index.ResourceIndexer
import com.google.android.fhir.logicalId
import com.google.android.fhir.pmapCPU
import com.google.android.fhir.search.SearchQuery
import com.google.android.fhir.toLocalChange
import com.google.android.fhir.updateMeta
import java.time.Instant
import java.util.UUID
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import org.hl7.fhir.r4.model.IdType
import org.hl7.fhir.r4.model.Resource
import org.hl7.fhir.r4.model.ResourceType
Expand Down Expand Up @@ -232,7 +229,7 @@ internal class DatabaseImpl(
query: SearchQuery,
): List<ResourceWithUUID<R>> {
return db.withTransaction {
resourceDao.getResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())).pmap {
resourceDao.getResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())).pmapCPU {
ResourceWithUUID(
it.uuid,
FhirContext.forR4Cached().newJsonParser().parseResource(it.serializedResource) as R,
Expand All @@ -247,7 +244,7 @@ internal class DatabaseImpl(
return db.withTransaction {
resourceDao
.getForwardReferencedResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray()))
.pmap {
.pmapCPU {
ForwardIncludeSearchResult(
it.matchingIndex,
it.baseResourceUUID,
Expand All @@ -264,7 +261,7 @@ internal class DatabaseImpl(
return db.withTransaction {
resourceDao
.getReverseReferencedResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray()))
.pmap {
.pmapCPU {
ReverseIncludeSearchResult(
it.matchingIndex,
it.baseResourceTypeAndId,
Expand Down Expand Up @@ -470,11 +467,6 @@ internal class DatabaseImpl(
}
}

/** Implementation of a parallelized map */
suspend fun <A, B> Iterable<A>.pmap(f: suspend (A) -> B): List<B> = coroutineScope {
map { async(Dispatchers.Default) { f(it) } }.awaitAll()
}

internal data class DatabaseConfig(
val inMemory: Boolean,
val enableEncryption: Boolean,
Expand Down
21 changes: 21 additions & 0 deletions engine/src/test/java/com/google/android/fhir/UtilTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ package com.google.android.fhir
import android.os.Build
import com.google.common.truth.Truth.assertThat
import junit.framework.TestCase
import kotlin.system.measureTimeMillis
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.hl7.fhir.r4.model.IdType
import org.hl7.fhir.r4.model.OperationOutcome
import org.hl7.fhir.r4.model.Patient
Expand Down Expand Up @@ -111,6 +114,24 @@ class UtilTest : TestCase() {
assertThat(percentOf(25, 50)).isEqualTo(0.5)
}

@Test
fun `pmap() should execute mapping tasks in parallel`() = runBlocking {
val numberList = listOf(2, 3)
var squaredNumberList: List<Int>

val timeTaken = measureTimeMillis {
squaredNumberList =
numberList.pmapCPU {
delay(1000L)
it * 2
}
}
assertTrue(squaredNumberList.isNotEmpty())
assertThat(squaredNumberList.first()).isEqualTo(4)
assertThat(squaredNumberList.last()).isEqualTo(6)
assertTrue(timeTaken < 2000)
}

companion object {
val TEST_OPERATION_OUTCOME_ERROR = OperationOutcome()
val TEST_OPERATION_OUTCOME_INFO = OperationOutcome()
Expand Down

0 comments on commit 9d8ca72

Please sign in to comment.