diff --git a/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt b/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt index 11c2f699e4..123c474cc5 100644 --- a/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt +++ b/engine/src/androidTest/java/com/google/android/fhir/db/impl/DatabaseImplTest.kt @@ -56,9 +56,6 @@ import com.google.common.truth.Truth.assertThat import java.math.BigDecimal import java.time.Instant import java.util.Date -import junit.framework.TestCase.assertTrue -import kotlin.system.measureTimeMillis -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.runBlocking @@ -5172,24 +5169,6 @@ class DatabaseImplTest { assertThat(localChangeResourceReferences.size).isEqualTo(locallyCreatedPatients.size) } - @Test - fun `pmap_shouldExecuteMappingTasksInParallel`() = runBlocking { - val numberList = listOf(2, 3) - var squaredNumberList: List - - val timeTaken = measureTimeMillis { - squaredNumberList = - numberList.pmap { - delay(1000L) - it * 2 - } - } - assertTrue(squaredNumberList.isNotEmpty()) - assertThat(squaredNumberList.first()).isEqualTo(4) - assertThat(squaredNumberList.last()).isEqualTo(6) - assertTrue(timeTaken < 2000) - } - private companion object { const val mockEpochTimeStamp = 1628516301000 const val TEST_PATIENT_1_ID = "test_patient_1" diff --git a/engine/src/main/java/com/google/android/fhir/Util.kt b/engine/src/main/java/com/google/android/fhir/Util.kt index 5ad65a52e9..d790171232 100644 --- a/engine/src/main/java/com/google/android/fhir/Util.kt +++ b/engine/src/main/java/com/google/android/fhir/Util.kt @@ -26,6 +26,10 @@ import java.time.ZoneId import java.time.format.DateTimeFormatter import java.util.Date import java.util.Locale +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope import org.hl7.fhir.r4.model.OperationOutcome import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType @@ -69,6 +73,11 @@ internal fun Resource.isUploadSuccess(): Boolean { outcome.issue.all { it.severity.equals(OperationOutcome.IssueSeverity.INFORMATION) } } +/** Implementation of a parallelized map for CPU intensive tasks */ +suspend fun Iterable.pmapCPU(f: suspend (A) -> B): List = coroutineScope { + map { async(Dispatchers.Default) { f(it) } }.awaitAll() +} + internal class OffsetDateTimeTypeAdapter : TypeAdapter() { override fun write(out: JsonWriter, value: OffsetDateTime) { out.value(DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(value)) diff --git a/engine/src/main/java/com/google/android/fhir/db/impl/DatabaseImpl.kt b/engine/src/main/java/com/google/android/fhir/db/impl/DatabaseImpl.kt index c90fb59249..20b176b2dc 100644 --- a/engine/src/main/java/com/google/android/fhir/db/impl/DatabaseImpl.kt +++ b/engine/src/main/java/com/google/android/fhir/db/impl/DatabaseImpl.kt @@ -38,15 +38,12 @@ import com.google.android.fhir.db.impl.entities.LocalChangeEntity import com.google.android.fhir.db.impl.entities.ResourceEntity import com.google.android.fhir.index.ResourceIndexer import com.google.android.fhir.logicalId +import com.google.android.fhir.pmapCPU import com.google.android.fhir.search.SearchQuery import com.google.android.fhir.toLocalChange import com.google.android.fhir.updateMeta import java.time.Instant import java.util.UUID -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.coroutineScope import org.hl7.fhir.r4.model.IdType import org.hl7.fhir.r4.model.Resource import org.hl7.fhir.r4.model.ResourceType @@ -232,7 +229,7 @@ internal class DatabaseImpl( query: SearchQuery, ): List> { return db.withTransaction { - resourceDao.getResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())).pmap { + resourceDao.getResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())).pmapCPU { ResourceWithUUID( it.uuid, FhirContext.forR4Cached().newJsonParser().parseResource(it.serializedResource) as R, @@ -247,7 +244,7 @@ internal class DatabaseImpl( return db.withTransaction { resourceDao .getForwardReferencedResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())) - .pmap { + .pmapCPU { ForwardIncludeSearchResult( it.matchingIndex, it.baseResourceUUID, @@ -264,7 +261,7 @@ internal class DatabaseImpl( return db.withTransaction { resourceDao .getReverseReferencedResources(SimpleSQLiteQuery(query.query, query.args.toTypedArray())) - .pmap { + .pmapCPU { ReverseIncludeSearchResult( it.matchingIndex, it.baseResourceTypeAndId, @@ -470,11 +467,6 @@ internal class DatabaseImpl( } } -/** Implementation of a parallelized map */ -suspend fun Iterable.pmap(f: suspend (A) -> B): List = coroutineScope { - map { async(Dispatchers.Default) { f(it) } }.awaitAll() -} - internal data class DatabaseConfig( val inMemory: Boolean, val enableEncryption: Boolean, diff --git a/engine/src/test/java/com/google/android/fhir/UtilTest.kt b/engine/src/test/java/com/google/android/fhir/UtilTest.kt index 65e1a31c2f..470e30b4a2 100644 --- a/engine/src/test/java/com/google/android/fhir/UtilTest.kt +++ b/engine/src/test/java/com/google/android/fhir/UtilTest.kt @@ -19,6 +19,9 @@ package com.google.android.fhir import android.os.Build import com.google.common.truth.Truth.assertThat import junit.framework.TestCase +import kotlin.system.measureTimeMillis +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import org.hl7.fhir.r4.model.IdType import org.hl7.fhir.r4.model.OperationOutcome import org.hl7.fhir.r4.model.Patient @@ -111,6 +114,24 @@ class UtilTest : TestCase() { assertThat(percentOf(25, 50)).isEqualTo(0.5) } + @Test + fun `pmap() should execute mapping tasks in parallel`() = runBlocking { + val numberList = listOf(2, 3) + var squaredNumberList: List + + val timeTaken = measureTimeMillis { + squaredNumberList = + numberList.pmapCPU { + delay(1000L) + it * 2 + } + } + assertTrue(squaredNumberList.isNotEmpty()) + assertThat(squaredNumberList.first()).isEqualTo(4) + assertThat(squaredNumberList.last()).isEqualTo(6) + assertTrue(timeTaken < 2000) + } + companion object { val TEST_OPERATION_OUTCOME_ERROR = OperationOutcome() val TEST_OPERATION_OUTCOME_INFO = OperationOutcome()