diff --git a/src/main/kotlin/com/openlattice/mechanic/pods/MechanicUpgradePod.kt b/src/main/kotlin/com/openlattice/mechanic/pods/MechanicUpgradePod.kt index 9f933923..65207fff 100644 --- a/src/main/kotlin/com/openlattice/mechanic/pods/MechanicUpgradePod.kt +++ b/src/main/kotlin/com/openlattice/mechanic/pods/MechanicUpgradePod.kt @@ -61,10 +61,7 @@ import com.openlattice.linking.PostgresLinkingFeedbackService import com.openlattice.linking.graph.PostgresLinkingQueryService import com.openlattice.mechanic.MechanicCli.Companion.UPGRADE import com.openlattice.mechanic.Toolbox -import com.openlattice.mechanic.upgrades.DeleteOrgMetadataEntitySets -import com.openlattice.mechanic.upgrades.ExportOrganizationMembers -import com.openlattice.mechanic.upgrades.MigrateChronicleSystemApps -import com.openlattice.mechanic.upgrades.V3StudyMigrationUpgrade +import com.openlattice.mechanic.upgrades.* import com.openlattice.organizations.roles.HazelcastPrincipalService import com.openlattice.organizations.roles.SecurePrincipalsManager import com.openlattice.postgres.PostgresTable @@ -373,9 +370,10 @@ class MechanicUpgradePod { @Bean fun exportOrganizationMembers(): ExportOrganizationMembers { - return ExportOrganizationMembers(toolbox, hikariDataSource, principalService(), hazelcastInstance) + return ExportOrganizationMembers(toolbox, hikariDataSource, principalService(), hazelcastInstance, rhizomeConfiguration) } + @Bean fun migrateChronicleSystemApps(): MigrateChronicleSystemApps { return MigrateChronicleSystemApps( toolbox, @@ -385,6 +383,78 @@ class MechanicUpgradePod { ) } + @Bean + fun migrateAppUsageSurveyData(): MigrateAppUsageSurveyData { + return MigrateAppUsageSurveyData( + toolbox, + rhizomeConfiguration, + authorizationService(), + principalService(), + searchService(), + dataQueryService(), + entitySetService() + ) + } + + @Bean + fun migrateTimeUseDiarySummarizedData(): MigrateTimeUseDiarySummarizedData { + return MigrateTimeUseDiarySummarizedData( + toolbox, + rhizomeConfiguration, + principalService(), + searchService(), + dataQueryService(), + entitySetService() + ) + } + + @Bean + fun migrateOrgSettingsToStudies(): MigrateOrgSettingsToStudies { + return MigrateOrgSettingsToStudies( + toolbox, + entitySetService(), + rhizomeConfiguration, + dataQueryService() + ) + } + + @Bean + fun migrateChronicleParticipantStats(): MigrateChronicleParticipantStats { + return MigrateChronicleParticipantStats( + toolbox, + searchService(), + principalService(), + entitySetService(), + dataQueryService(), + rhizomeConfiguration, + authorizationService() + ) + } + + @Bean + fun migrateTudSubmissions(): MigrateTimeUseDiarySubmissions { + return MigrateTimeUseDiarySubmissions( + toolbox, + rhizomeConfiguration, + dataQueryService(), + entitySetService(), + searchService(), + principalService() + ) + } + + @Bean + fun migratePreprocessedData() : MigratePreprocessedData { + return MigratePreprocessedData( + toolbox, + rhizomeConfiguration, + dataQueryService(), + entitySetService(), + searchService(), + principalService() + ) + } + @PostConstruct fun post() { Principals.init(principalService(), hazelcastInstance) diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/ExportOrganizationMembers.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/ExportOrganizationMembers.kt index 45df393a..9d8ff6ae 100644 --- a/src/main/kotlin/com/openlattice/mechanic/upgrades/ExportOrganizationMembers.kt +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/ExportOrganizationMembers.kt @@ -1,9 +1,11 @@ package com.openlattice.mechanic.upgrades +import com.geekbeast.rhizome.configuration.RhizomeConfiguration import com.hazelcast.core.HazelcastInstance import com.openlattice.hazelcast.HazelcastMap import com.openlattice.mechanic.Toolbox import com.openlattice.organizations.roles.SecurePrincipalsManager +import com.zaxxer.hikari.HikariConfig import com.zaxxer.hikari.HikariDataSource import org.slf4j.LoggerFactory @@ -15,12 +17,13 @@ class ExportOrganizationMembers( private val toolbox: Toolbox, private val hds: HikariDataSource, private val principalService: SecurePrincipalsManager, - hazelcast :HazelcastInstance + hazelcast :HazelcastInstance, + private val rhizomeConfiguration: RhizomeConfiguration ) : Upgrade { companion object { private const val USERS_EXPORT_TABLE_NAME = "users_export" const val USERS_EXPORT_TABLE = """ - CREATE TABLE $USERS_EXPORT_TABLE_NAME ( + CREATE TABLE IF NOT EXISTS $USERS_EXPORT_TABLE_NAME ( organization_id uuid, principal_id text NOT NULL, principal_email text, @@ -36,11 +39,16 @@ class ExportOrganizationMembers( private val organizations = HazelcastMap.ORGANIZATIONS.getMap(hazelcast) private val users = HazelcastMap.USERS.getMap(hazelcast) + private fun getDatasource(): HikariDataSource { + val (hikariConfiguration) = rhizomeConfiguration.datasourceConfigurations["chronicle"]!! + val hc = HikariConfig(hikariConfiguration) + return HikariDataSource(hc) + } override fun upgrade(): Boolean { val organizationsIds = organizations.keys.toMutableSet() val members = principalService.getOrganizationMembers(organizationsIds) - hds.connection.use { connection -> + getDatasource().connection.use { connection -> connection.createStatement().use { stmt -> stmt.execute(USERS_EXPORT_TABLE) } connection.prepareStatement(INSERT_USER_SQL).use { ps -> members.forEach { (orgId, orgMembers) -> diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateAppUsageSurveyData.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateAppUsageSurveyData.kt new file mode 100644 index 00000000..67b385fe --- /dev/null +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateAppUsageSurveyData.kt @@ -0,0 +1,406 @@ +package com.openlattice.mechanic.upgrades + +import com.geekbeast.postgres.PostgresArrays +import com.geekbeast.rhizome.configuration.RhizomeConfiguration +import com.openlattice.authorization.* +import com.openlattice.data.requests.NeighborEntityDetails +import com.openlattice.data.storage.MetadataOption +import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService +import com.openlattice.datastore.services.EntitySetManager +import com.openlattice.edm.EdmConstants +import com.openlattice.edm.EdmConstants.Companion.LAST_WRITE_FQN +import com.openlattice.graph.PagedNeighborRequest +import com.openlattice.hazelcast.HazelcastMap +import com.openlattice.mechanic.Toolbox +import com.openlattice.organizations.roles.SecurePrincipalsManager +import com.openlattice.search.SearchService +import com.openlattice.search.requests.EntityNeighborsFilter +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.apache.olingo.commons.api.edm.FullQualifiedName +import org.slf4j.LoggerFactory +import java.time.OffsetDateTime +import java.util.* + +/** + * @author alfoncenzioka <alfonce@openlattice.com> + */ +class MigrateAppUsageSurveyData( + toolbox: Toolbox, + private val rhizomeConfiguration: RhizomeConfiguration, + private val authorizationService: AuthorizationManager, + private val principalService: SecurePrincipalsManager, + private val searchService: SearchService, + private val dataQueryService: PostgresEntityDataQueryService, + private val entitySetService: EntitySetManager +) : Upgrade { + private val logger = LoggerFactory.getLogger(MigrateAppUsageSurveyData::class.java) + + private val organizations = HazelcastMap.ORGANIZATIONS.getMap(toolbox.hazelcast) + private val appConfigs = HazelcastMap.APP_CONFIGS.getMap(toolbox.hazelcast) + private val entitySetIds = HazelcastMap.ENTITY_SETS.getMap(toolbox.hazelcast).values.associateBy { it.name } + + private val studyEKIDByParticipantEKID: MutableMap = mutableMapOf() + private val participantIdByEKID: MutableMap = mutableMapOf() + private val surveyDataByParticipantEKID: MutableMap>>> = mutableMapOf() + private val studyIdByStudyEKID: MutableMap = mutableMapOf() + + companion object { + private val DATA_COLLECTION_APP_ID = UUID.fromString("c4e6d8fd-daf9-41e7-8c59-2a12c7ee0857") + + private val LEGACY_ORG_ID = UUID.fromString("7349c446-2acc-4d14-b2a9-a13be39cff93") + + private const val CHRONICLE_APP_ES_PREFIX = "chronicle_" + private const val DATA_COLLECTION_APP_PREFIX = "chronicle_data_collection_" + private const val USER_APPS = "userapps" + private const val USED_BY = "usedby" + private const val STUDIES = "studies" + private const val PARTICIPANTS = "participants" + private const val PARTICIPATED_IN = "participatedin" + + private const val LEGACY_USER_APPS_ES = "chronicle_user_apps" + private const val LEGACY_STUDIES_ES = "chronicle_study" + private const val LEGACY_PARTICIPATED_IN_ES = "chronicle_participated_in" + private const val LEGACY_USED_BY_ES = "chronicle_used_by" + + private val OL_ID_FQN = EdmConstants.ID_FQN + private val STRING_ID_FQN = FullQualifiedName("general.stringid") + private val PERSON_FQN = FullQualifiedName("nc.SubjectIdentification") + private val USER_FQN = FullQualifiedName("ol.user") + private val FULL_NAME_FQN = FullQualifiedName("general.fullname") + private val DATETIME_FQN = FullQualifiedName("ol.datetime") + private val TIMEZONE_FQN = FullQualifiedName("ol.timezone") + private val TITLE_FQN = FullQualifiedName("ol.title") + + // table column names + private const val SUBMISSION_DATE = "submission_date" + private const val APPLICATION_LABEL = "application_label" + private const val APP_PACKAGE_NAME = "app_package_name" + private const val EVENT_TIMESTAMP = "event_timestamp" + private const val TIMEZONE = "timezone" + private const val USERS = "users" + private const val V2_STUDY_ID = "v2_study_id" + private const val PARTICIPANT_ID = "participant_id" + private const val V2_STUDY_EKID = "v2_study_ekid" + + private val FQN_TO_COLUMNS = mapOf( + LAST_WRITE_FQN to SUBMISSION_DATE, + TITLE_FQN to APPLICATION_LABEL, + FULL_NAME_FQN to APP_PACKAGE_NAME, + DATETIME_FQN to EVENT_TIMESTAMP, + TIMEZONE_FQN to TIMEZONE, + USER_FQN to USERS + ) + + private val column_names = listOf(V2_STUDY_EKID, V2_STUDY_ID, PARTICIPANT_ID) + FQN_TO_COLUMNS.values.toList() + private val APP_USAGE_SURVEY_COLUMNS = column_names.joinToString(",") { it } + private val APP_USAGE_SURVEY_PARAMS = column_names.joinToString(",") { "?" } + + private const val TABLE_NAME = "migrate_app_usage_survey" + + /** + * PreparedStatement bind order + * 1) studyEKID + * 2) studyId + * 3) participantId + * 4) submissionDate, + * 5) appLabel + * 6) packageName + * 7) timestamp + * 8) timezone + * 9) users + */ + private val INSERT_INTO_APP_USAGE_SQL = """ + INSERT INTO $TABLE_NAME($APP_USAGE_SURVEY_COLUMNS) values ($APP_USAGE_SURVEY_PARAMS) + ON CONFLICT DO NOTHING + """.trimIndent() + + private val CREATE_APP_USAGE_SURVEY_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS $TABLE_NAME( + $V2_STUDY_EKID uuid NOT NULL, + $V2_STUDY_ID uuid NOT NULL, + $PARTICIPANT_ID text NOT NULL, + $SUBMISSION_DATE timestamp with time zone NOT NULL, + $APPLICATION_LABEL text, + $APP_PACKAGE_NAME text NOT NULL, + $EVENT_TIMESTAMP timestamp with time zone NOT NULL, + $TIMEZONE text, + $USERS text[], + PRIMARY KEY($APP_PACKAGE_NAME, $EVENT_TIMESTAMP) + ); + """.trimIndent() + } + + init { + val hds = getDatasource() + hds.connection.createStatement().use { stmt -> + stmt.execute(CREATE_APP_USAGE_SURVEY_TABLE_SQL) + } + } + + override fun upgrade(): Boolean { + getAllAppUsageSurveyData() + + logger.info("migrating app usage surveys to v3") + + val hds = getDatasource() + try { + val written = hds.connection.use { connection -> + connection.prepareStatement(INSERT_INTO_APP_USAGE_SQL).use { ps -> + surveyDataByParticipantEKID.forEach { (key, data) -> + val studyEKID = studyEKIDByParticipantEKID[key] + val studyId = studyIdByStudyEKID[studyEKID] + val participantId = participantIdByEKID[key] + + if (studyEKID == null || participantId == null || studyId == null) { + logger.warn("Skipping migration for participant $key. failed to retrieve associated studyId or participantId") + return@forEach + } + + ps.setObject(1, studyEKID) + ps.setObject(2, studyId) + ps.setString(3, participantId) + + data.forEach data@{ entity -> + val users = entity.getOrDefault(USER_FQN, listOf()) + .map { it.toString() }.filter { it.isNotBlank() }.toList() + + if (users.isEmpty() || users.first().toString().isBlank()) { + return@data + } + + val submissionDate = getFirstValueOrNull(entity, LAST_WRITE_FQN) + val applicationLabels = getAllValuesOrNull(entity, TITLE_FQN) + val appPackageName = getFirstValueOrNull(entity, FULL_NAME_FQN) + val applicationLabel = when(applicationLabels.size) { + 0 -> appPackageName + 1 -> applicationLabels.first() + else -> applicationLabels.find { it != appPackageName } + } + val timezone = getFirstValueOrNull(entity, TIMEZONE_FQN) + + if (submissionDate == null || appPackageName == null) { + return@data + } + + val timestamps = entity[DATETIME_FQN]?.map { it.toString() }?.toSet() ?: setOf() + timestamps.forEach { timestamp -> + var index = 3 + ps.setObject(++index, OffsetDateTime.parse(submissionDate)) + ps.setString(++index, applicationLabel) + ps.setString(++index, appPackageName) + ps.setObject(++index, OffsetDateTime.parse(timestamp)) + ps.setString(++index, timezone) + ps.setArray(++index, PostgresArrays.createTextArray(connection, users)) + ps.addBatch() + } + } + } + ps.executeBatch().sum() + } + } + + logger.info("wrote {} entities to db. data query service returned {} entities", written, surveyDataByParticipantEKID.values.flatten().size) + return true + } catch (ex: Exception) { + logger.error("error migrating app usage survey data to v3", ex) + return false + } + } + + private fun getDatasource(): HikariDataSource { + val (hikariConfiguration) = rhizomeConfiguration.datasourceConfigurations["chronicle"]!! + val hc = HikariConfig(hikariConfiguration) + return HikariDataSource(hc) + } + + private fun getAllAppUsageSurveyData() { + val v2DataCollectionOrgIds = appConfigs.keys + .filter { it.appId == DATA_COLLECTION_APP_ID } + .map { it.organizationId } + .toMutableSet() + + val superUserPrincipals = getChronicleSuperUserPrincipals() + + (v2DataCollectionOrgIds + LEGACY_ORG_ID).forEach org@{ orgId -> + logger.info("======================================") + logger.info("getting app usage survey data for org $orgId") + logger.info("=====================================") + + // entity sets + val orgEntitySetIds = getOrgEntitySetIds(orgId) + + val studiesEntitySetId = orgEntitySetIds.getValue(STUDIES) + val participatedInEntitySetId = orgEntitySetIds.getValue(PARTICIPATED_IN) + val participantsEntitySetId = orgEntitySetIds[PARTICIPANTS] // this will be null if org is legacy + val userAppsEntitySetId = orgEntitySetIds.getValue(USER_APPS) + val usedByEntitySetId = orgEntitySetIds.getValue(USED_BY) + + val adminRoleAclKey = organizations[orgId]?.adminRoleAclKey + if (adminRoleAclKey == null) { + logger.warn("skipping {} since it doesn't have admin role", orgId) + return@org + } + val principals = principalService.getAllUsersWithPrincipal(adminRoleAclKey).map { it.principal }.toSet() + superUserPrincipals + + val studies = filterInvalidStudies(getEntitiesByEntityKeyId(studiesEntitySetId)) + if (studies.isEmpty()) { + logger.info("org {} has no studies. skipping", orgId) + return@org + } + logger.info("found {} studies in org {}", studies.size, orgId) + + val studyEntityKeyIds = studies.keys + val studyIdByStudyEKID = studies.mapValues { getFirstUUIDOrNull(it.value, STRING_ID_FQN)!! } + val studyIds = studyIdByStudyEKID.values.toSet() + + val participantEntitySetIds = when (orgId) { + LEGACY_ORG_ID -> getLegacyParticipantEntitySetIds(studyIds) + else -> setOf(participantsEntitySetId!!) + } + + val participants = getStudyParticipants(studiesEntitySetId, studyEntityKeyIds, participantEntitySetIds, participatedInEntitySetId, principals) + + val participantsByStudyEKID = participants.mapValues { (_, neighbor) -> neighbor.map { it.neighborId.get() }.toSet() } + logger.info("org {} participant count by study {}", orgId, participantsByStudyEKID.mapValues { it.value.size }) + + val participantIdByEKID = + participants.values + .flatten() + .associate { getFirstUUIDOrNull(it.neighborDetails.get(), OL_ID_FQN)!! to getFirstValueOrNull(it.neighborDetails.get(), PERSON_FQN) } + val studyEKIDByParticipantEKID = participantsByStudyEKID + .map { (studyId, participants) -> participants.associateWith { studyId } } + .flatMap { + it.asSequence() + }.associate { it.key to it.value } + + val surveyDataByParticipantEKID = getSurveysDataByParticipant( + principals, participantsByStudyEKID.values.flatten().toSet(), participantEntitySetIds, usedByEntitySetId, userAppsEntitySetId) + + logger.info("found {} survey entities in org {}", surveyDataByParticipantEKID.values.flatten().size, orgId) + + this.studyEKIDByParticipantEKID += studyEKIDByParticipantEKID + this.participantIdByEKID += participantIdByEKID + this.surveyDataByParticipantEKID += surveyDataByParticipantEKID + this.studyIdByStudyEKID += studyIdByStudyEKID + } + } + + private fun getChronicleSuperUserPrincipals(): Set { + val userId = "auth0|5ae9026c04eb0b243f1d2bb6" + val securablePrincipal = principalService.getSecurablePrincipal(userId) + return principalService.getAllPrincipals(securablePrincipal).map { it.principal }.toSet() + Principal(PrincipalType.USER, userId) + } + + private fun filterInvalidStudies(entities: Map>>): Map>> { + return entities.filterValues { getFirstUUIDOrNull(it, STRING_ID_FQN) != null } + } + + // returns a map of collection template name to entity set id + private fun getOrgEntitySetIds(organizationId: UUID): Map { + val entitySetNameByTemplateName = when (organizationId) { + LEGACY_ORG_ID -> mapOf( + USER_APPS to LEGACY_USER_APPS_ES, + USED_BY to LEGACY_USED_BY_ES, + STUDIES to LEGACY_STUDIES_ES, + PARTICIPATED_IN to LEGACY_PARTICIPATED_IN_ES + ) + else -> { + val orgIdToStr = organizationId.toString().replace("-", "") + mapOf( + USER_APPS to "$DATA_COLLECTION_APP_PREFIX${orgIdToStr}_$USER_APPS", + USED_BY to "$DATA_COLLECTION_APP_PREFIX${orgIdToStr}_$USED_BY", + STUDIES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$STUDIES", + PARTICIPATED_IN to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$PARTICIPATED_IN", + PARTICIPANTS to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_${PARTICIPANTS}" + ) + } + } + + return entitySetNameByTemplateName.mapValues { entitySetIds.getValue(it.value).id } + } + + private fun getLegacyParticipantEntitySetIds(studyIds: Set): Set { + val entitySetNames = studyIds.map { "chronicle_participants_$it" } + return entitySetNames.mapNotNull { entitySetIds[it]?.id }.toSet() + } + + private fun getStudyParticipants( + studiesEntitySetId: UUID, + studyEntityKeyIds: Set, + participantEntitySetIds: Set, + participatedInEntitySetId: UUID, + principals: Set + + ): Map> { + val filter = EntityNeighborsFilter( + studyEntityKeyIds, Optional.of(participantEntitySetIds), Optional.of(setOf(studiesEntitySetId)), Optional.of(setOf(participatedInEntitySetId))) + return searchService.executeEntityNeighborSearch(setOf(studiesEntitySetId), PagedNeighborRequest(filter), principals).neighbors + } + + private fun getSurveysDataByParticipant( + principals: Set, + entityKeyIds: Set, + participantEntitySetIds: Set, + usedByEnEntitySetId: UUID, + userAppsEntitySetId: UUID): Map>>> { + + val filter = EntityNeighborsFilter(entityKeyIds, Optional.of(setOf(userAppsEntitySetId)), Optional.of(participantEntitySetIds), Optional.of(setOf(usedByEnEntitySetId))) + + val neighbors = searchService + .executeEntityNeighborSearch(participantEntitySetIds, PagedNeighborRequest(filter), principals).neighbors + .mapValues { (_, v) -> + v.associateBy { getFirstUUIDOrNull(it.associationDetails, OL_ID_FQN)!! } + } + + val associationEntityKeyIds = neighbors.values.flatMap { it.keys }.toSet() + val usedByEntities = getEntitiesByEntityKeyId(usedByEnEntitySetId, associationEntityKeyIds, setOf(MetadataOption.LAST_WRITE)) + + return neighbors + .mapValues { (_, v) -> + v.map { + it.value.neighborDetails.get().toMutableMap() + usedByEntities.getOrDefault(it.key, mapOf()) + } + } + } + + private fun getFirstUUIDOrNull(entity: Map>, fqn: FullQualifiedName): UUID? { + return when (val string = getFirstValueOrNull(entity, fqn)) { + null -> null + else -> UUID.fromString(string) + } + } + + private fun getAllValuesOrNull(entity: Map>, fqn: FullQualifiedName): Set { + entity[fqn]?.let { it -> + return it.mapNotNull { it.toString() }.toSet() + } + return setOf() + } + + private fun getFirstValueOrNull(entity: Map>, fqn: FullQualifiedName): String? { + entity[fqn]?.iterator()?.let { + if (it.hasNext()) return it.next().toString() + } + return null + } + + private fun getEntitiesByEntityKeyId( + entitySetId: UUID, + entityKeyIds: Set = setOf(), + metadataOptions: Set = setOf() + ): Map>> { + return dataQueryService.getEntitiesWithPropertyTypeFqns( + mapOf(entitySetId to Optional.of(entityKeyIds)), + entitySetService.getPropertyTypesOfEntitySets(setOf(entitySetId)), + mapOf(), + metadataOptions, + Optional.empty(), + false, + ) + } + + override fun getSupportedVersion(): Long { + return Version.V2021_07_23.value + } +} diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateChronicleParticipantStats.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateChronicleParticipantStats.kt new file mode 100644 index 00000000..1e56aefd --- /dev/null +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateChronicleParticipantStats.kt @@ -0,0 +1,508 @@ +package com.openlattice.mechanic.upgrades + +import com.geekbeast.postgres.PostgresDatatype +import com.geekbeast.rhizome.configuration.RhizomeConfiguration +import com.openlattice.authorization.* +import com.openlattice.data.requests.NeighborEntityDetails +import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService +import com.openlattice.datastore.services.EntitySetManager +import com.openlattice.edm.EdmConstants +import com.openlattice.graph.PagedNeighborRequest +import com.openlattice.hazelcast.HazelcastMap +import com.openlattice.mechanic.Toolbox +import com.openlattice.organizations.roles.SecurePrincipalsManager +import com.openlattice.search.SearchService +import com.openlattice.search.requests.EntityNeighborsFilter +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.apache.olingo.commons.api.edm.FullQualifiedName +import org.slf4j.LoggerFactory +import java.time.LocalDate +import java.time.OffsetDateTime +import java.util.* + +/** + * @author alfoncenzioka <alfonce@openlattice.com> + */ +class MigrateChronicleParticipantStats( + val toolbox: Toolbox, + private val searchService: SearchService, + private val principalService: SecurePrincipalsManager, + private val entitySetService: EntitySetManager, + private val dataQueryService: PostgresEntityDataQueryService, + private val rhizomeConfiguration: RhizomeConfiguration, + private val authorizationService: AuthorizationManager +) : Upgrade { + + // entitySetName -> string + private val entitySetIds: Map = HazelcastMap.ENTITY_SETS.getMap(toolbox.hazelcast).associate { it.value.name to it.key } + private val organizations = HazelcastMap.ORGANIZATIONS.getMap(toolbox.hazelcast) + + private val entities: MutableList = mutableListOf() + + companion object { + private val logger = LoggerFactory.getLogger(MigrateChronicleParticipantStats::class.java) + + private const val SUPER_USER_PRINCIPAL_ID = "auth0|5ae9026c04eb0b243f1d2bb6" + + private val LEGACY_ORG_ID = UUID.fromString("7349c446-2acc-4d14-b2a9-a13be39cff93") + private val DATA_COLLECTION_APP_ID = UUID.fromString("c4e6d8fd-daf9-41e7-8c59-2a12c7ee0857") + private val SURVEY_APP_ID = UUID.fromString("bb44218b-515a-4314-b955-df2c991b2575") + + private const val CHRONICLE_APP_ES_PREFIX = "chronicle_" + private const val SURVEYS_APP_ES_PREFIX = "chronicle_surveys_" + + // collection template names + private const val STUDIES_TEMPLATE = "studies" + private const val PARTICIPATED_IN_TEMPLATE = "participatedin" + private const val RESPONDS_WITH_TEMPLATE = "respondswith" + private const val SUBMISSION_TEMPLATE = "submission" + private const val PARTICIPANTS_TEMPLATE = "participants" + private const val HAS_TEMPLATE = "has" + private const val METADATA_TEMPLATE = "metadata" + + // legacy entity set names + private const val LEGACY_STUDIES_ES = "chronicle_study" + private const val LEGACY_PARTICIPATED_IN_ES = "chronicle_participated_in" + private const val LEGACY_RECORDED_BY_ES = "chronicle_recorded_by" + private const val LEGACY_HAS_ES = "chronicle_has" + private const val LEGACY_METADATA_ES = "chronicle_metadata" + + // entity sets lookup name + private const val STUDIES_ES = "studies" + private const val PARTICIPATED_IN_ES = "participatedIn" + private const val PARTICIPANTS_ES = "participants" + private const val SUBMISSION_ES = "submission" + private const val RESPONDS_WITH_ES = "respondsWith" + private const val HAS_ES = "has" + private const val METADATA_ES = "metadata" + + private val OL_ID_FQN = EdmConstants.ID_FQN + private val STRING_ID_FQN = FullQualifiedName("general.stringid") + private val PERSON_FQN = FullQualifiedName("nc.SubjectIdentification") + private val FULL_NAME_FQN = FullQualifiedName("general.fullname") + private val DATE_TIME_START_FQN = FullQualifiedName("ol.datetimestart") + private val DATE_TIME_END_FQN = FullQualifiedName("ol.datetimeend") + private val DATETIME_FQN = FullQualifiedName("ol.datetime") + private val RECORDED_DATE_FQN = FullQualifiedName("ol.recordeddate") + + private const val TABLE_NAME = "migrate_participant_stats" + + // column names + private const val ORGANIZATION_IO = "organization_id" + private const val V2_STUDY_ID = "study_id" + private const val PARTICIPANT_ID = "participant_id" + private const val ANDROID_FIRST_DATE = "android_first_date" + private const val ANDROID_LAST_DATE = "android_last_date" + private const val ANDROID_UNIQUE_DATES = "android_unique_dates" + private const val TUD_FIRST_DATE = "tud_first_date" + private const val TUD_LAST_DATE = "tud_last_date" + private const val TUD_UNIQUE_DATES = "tud_unique_dates" + private const val IOS_FIRST_DATE = "ios_first_date" + private const val IOS_LAST_DATE = "ios_last_date" + private const val IOS_UNIQUE_DATES = "ios_unique_dates" + + private val CREATE_STATS_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS $TABLE_NAME( + $ORGANIZATION_IO uuid NOT NULL, + $V2_STUDY_ID uuid NOT NULL, + $PARTICIPANT_ID text NOT NULL, + $ANDROID_FIRST_DATE timestamp with time zone, + $ANDROID_LAST_DATE timestamp with time zone, + $ANDROID_UNIQUE_DATES date[] default '{}', + $TUD_FIRST_DATE timestamp with time zone, + $TUD_LAST_DATE timestamp with time zone, + $TUD_UNIQUE_DATES date[] default '{}', + $IOS_FIRST_DATE timestamp with time zone, + $IOS_LAST_DATE timestamp with time zone, + $IOS_UNIQUE_DATES date[] default '{}', + PRIMARY KEY($V2_STUDY_ID, $PARTICIPANT_ID) + ) + """.trimIndent() + + private val COLS = setOf( + ORGANIZATION_IO, + V2_STUDY_ID, + PARTICIPANT_ID, + ANDROID_FIRST_DATE, + ANDROID_LAST_DATE, + ANDROID_UNIQUE_DATES, + TUD_FIRST_DATE, + TUD_LAST_DATE, + TUD_UNIQUE_DATES + ) + + private val PARTICIPANT_STATS_COLS = COLS.joinToString { it } + private val PARTICIPANT_STATS_PARAMS = COLS.joinToString { "?" } + + /**PreparedStatement bind order + * 1) organizationId + * 2) studyEntityKeyId, + * 3) studyId, + * 4) participantId + * 5) androidFirstDate + * 6) androidLastDate + * 7) androidUniqueDates, + * 8) tudFirstDate, + * 9) tudLastDate + * 10) tudUniqueDates + */ + private val INSERT_PARTICIPANT_STATS_SQL = """ + INSERT INTO $TABLE_NAME ($PARTICIPANT_STATS_COLS) values ($PARTICIPANT_STATS_PARAMS) + ON CONFLICT DO NOTHING + """.trimIndent() + } + + init { + // create table + val hds = getHikariDataSource() + hds.connection.createStatement().use { stmt -> stmt.execute(CREATE_STATS_TABLE_SQL) } + } + + + override fun upgrade(): Boolean { + getEntitiesToInsert() + val totalWritten = writeEntitiesToTable() + logger.info("Exported $totalWritten entities to participant stats table. Expected to export ${entities.size} entities") + return true + } + + + override fun getSupportedVersion(): Long { + return Version.V2021_07_23.value + } + + private fun getHikariDataSource(): HikariDataSource { + val (hikariConfiguration) = rhizomeConfiguration.datasourceConfigurations["chronicle"]!! + val hc = HikariConfig(hikariConfiguration) + return HikariDataSource(hc) + } + + private fun getEntitiesToInsert() { + + val orgIdsByAppId = getOrgIdsByAppId().toMutableMap() + orgIdsByAppId.getValue(DATA_COLLECTION_APP_ID).add(LEGACY_ORG_ID) + val superUserPrincipals = getChronicleSuperUserPrincipals() + + logger.info("Using principals: $superUserPrincipals") + + (orgIdsByAppId.values.flatten().toSet()).forEach { orgId -> + logger.info("---------------------------------------------") + logger.info("Retrieving entities in organization $orgId") + logger.info("----------------------------------------------") + + val adminRoleAclKey = organizations[orgId]?.adminRoleAclKey + if (adminRoleAclKey == null) { + logger.warn("skipping {} since it doesn't have admin role", orgId) + return@forEach + } + + // step 1: get studies in org: {studyEntityKeyId -> study} + val entitySets = getOrgEntitySetNames(orgId) + val studies: Map = getOrgStudies(entitySetId = entitySets.getValue(STUDIES_ES)) + if (studies.isEmpty()) { + logger.info("organization $orgId has no studies. Skipping") + return@forEach + } + logger.info("Retrieved ${studies.size} studies") + + // step 2: get all participants in org + val participantEntitySets = when (orgId) { + LEGACY_ORG_ID -> getLegacyParticipantEntitySetIds(studies.values.map { it.studyId }.toSet()) + else -> setOf(entitySets.getValue(PARTICIPANTS_ES)) + } + logger.info("participant entity sets: $participantEntitySets") + + val participants = getOrgParticipants( + participantEntitySetIds = participantEntitySets, + studiesEntitySetId = entitySets.getValue(STUDIES_ES), + entityKeyIds = studies.keys, + principals = superUserPrincipals, + edgeEntitySetId = entitySets.getValue(PARTICIPATED_IN_ES) + ).toMutableMap() + + if (participants.values.isEmpty()) { + logger.info("no participants found in org. Skipping participant stats fetch") + return@forEach + } + + logger.info("Retrieved ${participants.values.flatten().size} participants") + logger.info("Participant count by study: ${participants.map { studies.getValue(it.key).title to it.value.size }.toMap()}") + + // step 3: neighbor search on participant entity set + val participantStats = getParticipantStats( + participantEntitySets = participantEntitySets, + entitySetIds = entitySets, + orgIdsByAppId = orgIdsByAppId, + orgId = orgId, + principals = superUserPrincipals, + participantById = participants.values.flatten().associateBy { it.id }, + studies = studies + ) + + logger.info("Participant stats entities by study: ${participantStats.map { studies.getValue(it.key).title to it.value.size }.toMap()}") + entities.addAll(participantStats.values.flatten()) + + } + + logger.info("Total entities to write: ${entities.size}") + } + + private fun writeEntitiesToTable(): Int { + return getHikariDataSource().connection.use { connection -> + try { + val wc = connection.prepareStatement(INSERT_PARTICIPANT_STATS_SQL).use { ps -> + entities.forEach { + var index = 0 + ps.setObject(++index, it.organizationId) + ps.setObject(++index, it.studyId) + ps.setString(++index, it.participantId) + ps.setObject(++index, it.androidFirstDate) + ps.setObject(++index, it.androidLastDate) + ps.setArray(++index, connection.createArrayOf(PostgresDatatype.DATE.sql(), it.androidUniqueDates.toTypedArray())) + ps.setObject(++index, it.tudFirstDate) + ps.setObject(++index, it.tudLastDate) + ps.setArray(++index, connection.createArrayOf(PostgresDatatype.DATE.sql(), it.tudUniqueDates.toTypedArray())) + ps.addBatch() + } + ps.executeBatch().sum() + } + return@use wc + } catch (ex: Exception) { + throw ex + } + } + } + + private fun getOrgStudies(entitySetId: UUID): Map { + return dataQueryService.getEntitiesWithPropertyTypeFqns( + mapOf(entitySetId to Optional.empty()), + entitySetService.getPropertyTypesOfEntitySets(setOf(entitySetId)), + mapOf(), + setOf(), + Optional.empty(), + false + ) + .filter { getFirstUUIDOrNull(it.value, STRING_ID_FQN) != null } + .mapValues { getStudyEntity(it.key, it.value) } + } + + // Returns a mapping from studyEntityKeyId to list of participants + private fun getOrgParticipants( + participantEntitySetIds: Set, + edgeEntitySetId: UUID, + studiesEntitySetId: UUID, + entityKeyIds: Set, + principals: Set + ) + : Map> { + val filter = EntityNeighborsFilter(entityKeyIds, Optional.of(participantEntitySetIds), Optional.empty(), Optional.of(setOf(edgeEntitySetId))) + + return searchService + .executeEntityNeighborSearch(setOf(studiesEntitySetId), PagedNeighborRequest(filter), principals) + .neighbors + .mapValues { it.value.map { neighbor -> getParticipantFromNeighborEntity(it.key, neighbor) }.toSet() } + + } + + // mapping from studyEntityKeyId to a list of participant stats objects + private fun getParticipantStats( + participantEntitySets: Set, + entitySetIds: Map, + orgIdsByAppId: Map>, + orgId: UUID, + principals: Set, + participantById: Map, + studies: Map, + ): Map> { + + val srcEntitySetIds: MutableSet = participantEntitySets.toMutableSet() + val edgeEntitySetIds: MutableSet = mutableSetOf(entitySetIds.getValue(HAS_ES)) + val dstEntitySetIds: MutableSet = mutableSetOf(entitySetIds.getValue(METADATA_ES)) + + if (isAppIdInOrg(orgId, SURVEY_APP_ID, orgIdsByAppId)) { + edgeEntitySetIds.add(entitySetIds.getValue(RESPONDS_WITH_ES)) + dstEntitySetIds.add(entitySetIds.getValue(SUBMISSION_ES)) + } + + val filter = EntityNeighborsFilter( + participantById.keys, + Optional.of(srcEntitySetIds), + Optional.of(dstEntitySetIds), + Optional.of(edgeEntitySetIds) + ) + + return searchService.executeEntityNeighborSearch(participantEntitySets, PagedNeighborRequest(filter), principals) + .neighbors + .mapValues { (id, neighbors) -> + val neighborsByAssociationES = neighbors.groupBy { it.associationEntitySet.id } + val androidStats = getParticipantAndroidStats(neighborsByAssociationES[entitySetIds.getValue(HAS_ES)]) + val tudStats = getParticipantTudStats(neighborsByAssociationES[entitySetIds[RESPONDS_WITH_ES]]) // not every org has respondsWith entity set + + val studyEntityKeyId = participantById.getValue(id).studyEntityKeyId + ParticipantStats( + organizationId = orgId, + studyEntityKeyId = studyEntityKeyId, + studyId = studies.getValue(studyEntityKeyId).studyId, + participantId = participantById.getValue(id).participantId!!, + androidFirstDate = androidStats.first, + androidLastDate = androidStats.second, + androidUniqueDates = androidStats.third, + tudFirstDate = tudStats.first, + tudLastDate = tudStats.second, + tudUniqueDates = tudStats.third + ) + }.values.groupBy { it.studyEntityKeyId } + } + + // start, end date, count + // in theory each participant should only have a single NeighborEntityDetails in the metadata entity set, + // but some might have multiple entities + private fun getParticipantAndroidStats(neighbors: List?): Triple> { + + if (neighbors == null || neighbors.isEmpty()) { + return Triple(null, null, setOf()) + } + + val dateTimeStartValues = getOffsetDateTimesFromNeighborEntities(neighbors, DATE_TIME_START_FQN) + val dateTimeEndValues = getOffsetDateTimesFromNeighborEntities(neighbors, DATE_TIME_END_FQN) + val datesRecorded = getOffsetDateTimesFromNeighborEntities(neighbors, RECORDED_DATE_FQN) + + val firstDate = dateTimeStartValues.stream().min(OffsetDateTime::compareTo) + val lastDate = dateTimeEndValues.stream().max(OffsetDateTime::compareTo) + + return Triple( + first = if (firstDate.isEmpty) null else firstDate.get(), + second = if (lastDate.isEmpty) null else lastDate.get(), + third = datesRecorded.map { it.toLocalDate() }.toSet() // unique dates + ) + } + + // start date, end date, count + private fun getParticipantTudStats(neighbors: List?): Triple> { + if (neighbors == null) return Triple(null, null, setOf()) + + val dateTimeValues = getOffsetDateTimesFromNeighborEntities(neighbors, DATETIME_FQN) + + return Triple( + first = dateTimeValues.stream().min(OffsetDateTime::compareTo).get(), + second = dateTimeValues.stream().max(OffsetDateTime::compareTo).get(), + third = dateTimeValues.map { it.toLocalDate() }.toSet() // unique dates + ) + } + + // returns a mapping from appId to setOf organizations containing app + private fun getOrgIdsByAppId(): Map> { + return HazelcastMap.APP_CONFIGS.getMap(toolbox.hazelcast).keys + .filter { it.appId == DATA_COLLECTION_APP_ID || it.appId == SURVEY_APP_ID } + .groupBy { it.appId } + .mapValues { it.value.map { config -> config.organizationId }.toMutableSet() } + } + + + private fun getOffsetDateTimesFromNeighborEntities(entities: List, fqn: FullQualifiedName): Set { + return entities + .map { getAllValuesOrNull(it.neighborDetails.get(), fqn) } + .flatten().map { OffsetDateTime.parse(it) }.toSet() + } + + private fun isAppIdInOrg(orgId: UUID, appId: UUID, orgIdsByAppId: Map>): Boolean { + return orgIdsByAppId.getValue(appId).contains(orgId) + } + + private fun getChronicleSuperUserPrincipals(): Set { + val securablePrincipal = principalService.getSecurablePrincipal(SUPER_USER_PRINCIPAL_ID) + return principalService.getAllPrincipals(securablePrincipal).map { it.principal }.toSet() + Principal(PrincipalType.USER, SUPER_USER_PRINCIPAL_ID) + } + + private fun getLegacyParticipantEntitySetIds(studyIds: Set): Set { + val entitySetNames = studyIds.map { "chronicle_participants_$it" } + return entitySetNames.mapNotNull { entitySetIds[it] }.toSet() + } + + private fun getOrgEntitySetNames(orgId: UUID): Map { + val entitySetNameByTemplateName = when (orgId) { + LEGACY_ORG_ID -> mapOf( + STUDIES_ES to LEGACY_STUDIES_ES, + PARTICIPATED_IN_ES to LEGACY_PARTICIPATED_IN_ES, + HAS_ES to LEGACY_HAS_ES, + METADATA_ES to LEGACY_METADATA_ES + ) + else -> { + val orgIdToStr = orgId.toString().replace("-", "") + mapOf( + STUDIES_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$STUDIES_TEMPLATE", + HAS_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$HAS_TEMPLATE", + METADATA_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$METADATA_TEMPLATE", + PARTICIPATED_IN_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$PARTICIPATED_IN_TEMPLATE", + PARTICIPANTS_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_${PARTICIPANTS_TEMPLATE}", + SUBMISSION_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${SUBMISSION_TEMPLATE}", + RESPONDS_WITH_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${RESPONDS_WITH_TEMPLATE}", + ) + } + } + + return entitySetNameByTemplateName.filter { entitySetIds.keys.contains(it.value) }.mapValues { entitySetIds.getValue(it.value) } + } + + private fun getParticipantFromNeighborEntity(studyEntityKeyId: UUID, entity: NeighborEntityDetails): Participant { + val id = getFirstUUIDOrNull(entity.neighborDetails.get(), OL_ID_FQN) + val participantId = getFirstValueOrNull(entity.neighborDetails.get(), PERSON_FQN) + + return Participant(studyEntityKeyId, id!!, participantId!!) // hope this force unwrapping doesn't throw NPE + + } + + private fun getFirstUUIDOrNull(entity: Map>, fqn: FullQualifiedName): UUID? { + return when (val string = getFirstValueOrNull(entity, fqn)) { + null -> null + else -> UUID.fromString(string) + } + } + + private fun getAllValuesOrNull(entity: Map>, fqn: FullQualifiedName): Set { + entity[fqn]?.let { it -> + return it.mapNotNull { it.toString() }.toSet() + } + return setOf() + } + + private fun getFirstValueOrNull(entity: Map>, fqn: FullQualifiedName): String? { + entity[fqn]?.iterator()?.let { + if (it.hasNext()) return it.next().toString() + } + return null + } + + private fun getStudyEntity(studyEntityKeyId: UUID, entity: Map>): Study { + val title = getFirstValueOrNull(entity, FULL_NAME_FQN) + val studyId = getFirstUUIDOrNull(entity, STRING_ID_FQN) + return Study(studyEntityKeyId, studyId!!, title = title) + } +} + +data class Participant( + val studyEntityKeyId: UUID, + val id: UUID, + val participantId: String?, +) + +private data class ParticipantStats( + val organizationId: UUID, + val studyEntityKeyId: UUID, + val studyId: UUID, + val participantId: String, + val androidFirstDate: Any?, + val androidLastDate: Any?, + val androidUniqueDates: Set = setOf(), + val tudFirstDate: Any?, + val tudLastDate: Any?, + val tudUniqueDates: Set = setOf() +) + +data class Study( + val studyEntityKeyId: UUID, + val studyId: UUID, + val settings: Map = mapOf(), + val title: String? = "" +) \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateChronicleSystemApps.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateChronicleSystemApps.kt index 14c37b6c..a34cc4a5 100644 --- a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateChronicleSystemApps.kt +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateChronicleSystemApps.kt @@ -35,15 +35,16 @@ class MigrateChronicleSystemApps( private val FULL_NAME_FQN = FullQualifiedName("general.fullname") private val RECORD_TYPE_FQN = FullQualifiedName("ol.recordtype") + private const val TABLE_NAME = "migrate_system_apps" private val CREATE_SYSTEM_APPS_SQL = """ - CREATE TABLE IF NOT EXISTS public.system_apps( + CREATE TABLE IF NOT EXISTS $TABLE_NAME( app_package_name text NOT NULL, PRIMARY KEY (app_package_name) ) """.trimIndent() private val INSERT_SYSTEM_APPS_SQL = """ - INSERT INTO public.system_apps values(?) ON CONFLICT DO NOTHING + INSERT INTO $TABLE_NAME values(?) ON CONFLICT DO NOTHING """.trimIndent() } diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateOrgSettingsToStudies.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateOrgSettingsToStudies.kt new file mode 100644 index 00000000..833a0de9 --- /dev/null +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateOrgSettingsToStudies.kt @@ -0,0 +1,215 @@ +package com.openlattice.mechanic.upgrades + +import com.geekbeast.mappers.mappers.ObjectMappers +import com.geekbeast.rhizome.configuration.RhizomeConfiguration +import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService +import com.openlattice.datastore.services.EntitySetManager +import com.openlattice.edm.EdmConstants +import com.openlattice.hazelcast.HazelcastMap +import com.openlattice.mechanic.Toolbox +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.apache.olingo.commons.api.edm.FullQualifiedName +import org.slf4j.LoggerFactory +import java.util.* + +/** + * @author alfoncenzioka <alfonce@openlattice.com> + */ +class MigrateOrgSettingsToStudies( + val toolbox: Toolbox, + private val entitySetsManager: EntitySetManager, + private val rhizomeConfiguration: RhizomeConfiguration, + private val dataQueryService: PostgresEntityDataQueryService + +) : Upgrade { + private val entitySetIdsByName: Map = HazelcastMap.ENTITY_SETS.getMap(toolbox.hazelcast).values.associate { it.name to it.id } + + companion object { + private val logger = LoggerFactory.getLogger(MigrateOrgSettingsToStudies::class.java) + private val mapper = ObjectMappers.getJsonMapper() + + // app ids + private val DATA_COLLECTION_APP_ID = UUID.fromString("c4e6d8fd-daf9-41e7-8c59-2a12c7ee0857") + private val SURVEY_APP_ID = UUID.fromString("bb44218b-515a-4314-b955-df2c991b2575") + private val CHRONICLE_APP_ID = UUID.fromString("82e5504b-4dca-4600-a321-fa8e00e3b788") + + private val LEGACY_ORG_ID = UUID.fromString("7349c446-2acc-4d14-b2a9-a13be39cff93") + private val RICE_UNIVERSITY_ORG_ID = UUID.fromString("a77a8f87-9e3f-4ae1-bfb1-c5c72f194fa8") + + private const val LEGACY_STUDY_ENTITY_SET = "chronicle_study" + + private val OL_ID_FQN = EdmConstants.ID_FQN + private val STRING_ID_FQN = FullQualifiedName("general.stringid") + + private val appIdToComponentMapping = mapOf( + DATA_COLLECTION_APP_ID to AppComponents.CHRONICLE_DATA_COLLECTION, + SURVEY_APP_ID to AppComponents.CHRONICLE_SURVEYS, + CHRONICLE_APP_ID to AppComponents.CHRONICLE + ) + + // settings keys + private const val APP_FREQUENCY_SETTING = "appUsageFrequency" + private const val COMPONENTS_SETTING = "components" + + // columns to insert + private const val LEGACY_STUDY_ID = "v2_study_id" + private const val LEGACY_STUDY_EK_ID = "v2_study_ekid" + private const val SETTINGS = "settings" + + private const val STUDY_SETTINGS_TABLE = "migrate_study_settings" + + private val CREATE_TABLE_QUERY = """ + CREATE TABLE IF NOT EXISTS $STUDY_SETTINGS_TABLE( + $LEGACY_STUDY_EK_ID uuid NOT NULL, + $LEGACY_STUDY_ID uuid NOT NULL, + $SETTINGS jsonb, + PRIMARY KEY ($LEGACY_STUDY_EK_ID) + ) + """.trimIndent() + + /** + * PreparedStatement bind order + * 1) studyEntityKeyId + * 2) studyId + * 3) settings + */ + private val INSERT_INTO_SETTINGS_TABLE_QUERY = """ + INSERT INTO $STUDY_SETTINGS_TABLE ($LEGACY_STUDY_EK_ID, $LEGACY_STUDY_ID, $SETTINGS) VALUES (?, ?, ?::jsonb) + ON CONFLICT DO NOTHING + """.trimIndent() + } + + init { + getDataSource().connection.createStatement().use { stmt -> + stmt.execute(CREATE_TABLE_QUERY) + } + } + + override fun upgrade(): Boolean { + val appConfigs = HazelcastMap.APP_CONFIGS.getMap(toolbox.hazelcast) + + val appIdsByOrganizationId: MutableMap> = appConfigs.keys + .filter { it.appId == DATA_COLLECTION_APP_ID || it.appId == SURVEY_APP_ID || it.appId == CHRONICLE_APP_ID } + .groupBy { it.organizationId } + .mapValues { (_, configs) -> configs.map { it.appId }.toSet() } + .toMutableMap() + + // legacy org id should have all app components + appIdsByOrganizationId[LEGACY_ORG_ID] = appIdToComponentMapping.keys + + val studiesByOrgId: Map> = getStudiesByOrgId(appIdsByOrganizationId.keys, appIdsByOrganizationId) + logger.info("Entities to write by org: ${studiesByOrgId.mapValues { it.value.size }}") + + val rowsWritten = writeEntitiesToTable(studiesByOrgId) + logger.info("Wrote $rowsWritten study setting entities to table") + return true + } + + private fun writeEntitiesToTable(entities: Map>): Int { + return getDataSource().connection.use { connection -> + try { + val wc = connection.prepareStatement(INSERT_INTO_SETTINGS_TABLE_QUERY).use { ps -> + entities.values.flatten().forEach { + ps.setObject(1, it.studyEntityKeyId) + ps.setObject(2, it.studyId) + ps.setString(3, mapper.writeValueAsString(it.settings)) + ps.addBatch() + } + ps.executeBatch().sum() + } + return@use wc + } catch (ex: Exception) { + throw ex + } + } + } + + private fun getDataSource(): HikariDataSource { + val (hikariConfiguration) = rhizomeConfiguration.datasourceConfigurations["chronicle"]!! + val hc = HikariConfig(hikariConfiguration) + return HikariDataSource(hc) + } + + private fun getStudiesByOrgId(orgIds: Set, appIdsByOrganizationId: Map>): Map> { + val studyIdEntitySetIdByOrgId = getEntitySetIds(orgIds) + + return orgIds.associateWith { orgId -> + getStudyEntities( + orgId, + appIdsByOrganizationId.getValue(orgId), + studyIdEntitySetIdByOrgId.getValue(orgId) + ) + } + } + + private fun getStudyEntities(orgId: UUID, appIds: Set, entitySetId: UUID): List { + return dataQueryService.getEntitiesWithPropertyTypeFqns( + mapOf(entitySetId to Optional.empty()), + entitySetsManager.getPropertyTypesOfEntitySets(setOf(entitySetId)), + mapOf(), + setOf(), + Optional.empty(), + false + ).values.filter { getFirstUUIDOrNull(it, STRING_ID_FQN) != null } .map { getStudyEntity(it, appIds, orgId) }.toList() + } + + private fun getStudyEntity(entity: Map>, appIds: Set, orgId: UUID): Study { + val studyId = getFirstUUIDOrNull(entity, STRING_ID_FQN) + val studyEntityKeyId = getFirstUUIDOrNull(entity, OL_ID_FQN) + + val settings: MutableMap = mutableMapOf() + + val appComponents = appIds.map { appIdToComponentMapping.getValue(it) }.toMutableSet() + if (appIds.contains(SURVEY_APP_ID) && orgId != LEGACY_ORG_ID) { + appComponents.add(AppComponents.TIME_USE_DIARY) + } + settings[COMPONENTS_SETTING] = appComponents + settings[APP_FREQUENCY_SETTING] = if (orgId == RICE_UNIVERSITY_ORG_ID) AppUsageFrequency.HOURLY else AppUsageFrequency.DAILY + + return Study( + studyEntityKeyId = studyEntityKeyId!!, + studyId = studyId!!, + settings = settings + ) + } + + private fun getEntitySetIds(orgIds: Set): Map { + return orgIds.associateWith { orgId -> + when (orgId) { + LEGACY_ORG_ID -> LEGACY_STUDY_ENTITY_SET + else -> "chronicle_${orgId.toString().replace("-", "")}_studies" + } + }.mapValues { entitySetIdsByName.getValue(it.value) } + } + + private fun getFirstUUIDOrNull(entity: Map>, fqn: FullQualifiedName): UUID? { + return when (val string = getFirstValueOrNull(entity, fqn)) { + null -> null + else -> UUID.fromString(string) + } + } + + private fun getFirstValueOrNull(entity: Map>, fqn: FullQualifiedName): String? { + entity[fqn]?.iterator()?.let { + if (it.hasNext()) return it.next().toString() + } + return null + } + + override fun getSupportedVersion(): Long { + return Version.V2021_07_23.value + } +} + +private enum class AppComponents { + CHRONICLE, + CHRONICLE_DATA_COLLECTION, + CHRONICLE_SURVEYS, + TIME_USE_DIARY +} + +private enum class AppUsageFrequency { + DAILY, + HOURLY +} \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigratePreprocessedData.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigratePreprocessedData.kt new file mode 100644 index 00000000..f2e1b443 --- /dev/null +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigratePreprocessedData.kt @@ -0,0 +1,398 @@ +package com.openlattice.mechanic.upgrades + +import com.geekbeast.postgres.streams.BasePostgresIterable +import com.geekbeast.postgres.streams.PreparedStatementHolderSupplier +import com.geekbeast.rhizome.configuration.RhizomeConfiguration +import com.openlattice.authorization.Principal +import com.openlattice.authorization.PrincipalType +import com.openlattice.data.requests.NeighborEntityDetails +import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService +import com.openlattice.datastore.services.EntitySetManager +import com.openlattice.graph.PagedNeighborRequest +import com.openlattice.hazelcast.HazelcastMap +import com.openlattice.mechanic.Toolbox +import com.openlattice.organizations.roles.SecurePrincipalsManager +import com.openlattice.search.SearchService +import com.openlattice.search.requests.EntityNeighborsFilter +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.apache.olingo.commons.api.edm.FullQualifiedName +import org.slf4j.LoggerFactory +import java.sql.ResultSet +import java.time.OffsetDateTime +import java.util.* + +/** + * @author alfoncenzioka <alfonce@openlattice.com> + */ +class MigratePreprocessedData( + val toolbox: Toolbox, + private val rhizomeConfiguration: RhizomeConfiguration, + private val dataQueryService: PostgresEntityDataQueryService, + private val entitySetService: EntitySetManager, + private val searchService: SearchService, + private val principalService: SecurePrincipalsManager, +) : Upgrade { + + private val entitySetIds: Map = HazelcastMap.ENTITY_SETS.getMap(toolbox.hazelcast).associate { it.value.name to it.key } + private val appConfigs = HazelcastMap.APP_CONFIGS.getMap(toolbox.hazelcast) + + private val organizations = HazelcastMap.ORGANIZATIONS.getMap(toolbox.hazelcast) + + companion object { + private val logger = LoggerFactory.getLogger(MigratePreprocessedData::class.java) + + private const val SUPER_USER_PRINCIPAL_ID = "auth0|5ae9026c04eb0b243f1d2bb6" + + private val DATA_COLLECTION_APP_ID = UUID.fromString("c4e6d8fd-daf9-41e7-8c59-2a12c7ee0857") + private val LEGACY_ORG_ID = UUID.fromString("7349c446-2acc-4d14-b2a9-a13be39cff93") + + private const val CHRONICLE_APP_ES_PREFIX = "chronicle_" + private const val DATA_COLLECTION_APP_ES_PREFIX = "chronicle_data_collection_" + + private const val LEGACY_STUDIES_ES = "chronicle_study" + private const val LEGACY_RECORDED_BY_ES = "chronicle_recorded_by" + private const val LEGACY_PREPROCESSED_ES = "chronicle_preprocessed_app_data" + private const val LEGACY_PARTICIPATED_IN_ES = "chronicle_participated_in" + + + // collection template names + private const val PRE_PROCESSED_TEMPLATE = "preprocesseddata"; + private const val PARTICIPATED_IN_TEMPLATE = "participatedin" + private const val PARTICIPANTS_TEMPLATE = "participants" + private const val RECORDED_BY_TEMPLATE = "recordedby" + + // entity sets lookup name + private const val RECORDED_BY_ES = "recordedBy" + private const val PRE_PROCESSED_ES = "preprocessed" + private const val PARTICIPATED_IN_ES = "participatedIn" + private const val PARTICIPANTS_ES = "participants" + + // table columns + private const val PARTICIPANT_ID = "participant_id" + private const val STUDY_ID = "study_id" + private const val APP_LABEL = "app_label" + private const val DATE_TIME_START = "datetime_start" + private const val DATE_TIME_END = "datetime_end" + private const val APP_PACKAGE_NAME = "app_package_name" + private const val TIMEZONE = "timezone" + private const val RECORD_TYPE = "record_type" + private const val NEW_PERIOD = "new_period" + private const val NEW_APP = "new_app" + private const val DURATION = "duration_seconds" + private const val WARNING = "warning" + + private val PERSON_FQN = FullQualifiedName("nc.SubjectIdentification") + private val TITLE_FQN = FullQualifiedName("ol.title") + private val DATE_TIME_START_FQN = FullQualifiedName("ol.datetimestart") + private val DATE_TIME_END_FQN = FullQualifiedName("general.EndTime") + private val FULL_NAME_FQN = FullQualifiedName("general.fullname") + private val TIMEZONE_FQN = FullQualifiedName("ol.timezone") + private val RECORD_TYPE_FQN = FullQualifiedName("ol.recordtype") + private val NEW_PERIOD_FQN = FullQualifiedName("ol.newperiod") + private val DURATION_FQN = FullQualifiedName("general.Duration") + private val WARNING_FQN = FullQualifiedName("ol.warning") + private val NEW_APP_FQN = FullQualifiedName("ol.newapp") + + + private val CREATE_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS preprocessed_usage_events( + $STUDY_ID uuid not null, + $PARTICIPANT_ID text not null, + $APP_LABEL text, + $APP_PACKAGE_NAME text, + $DATE_TIME_START timestamp with time zone, + $DATE_TIME_END timestamp with time zone, + $TIMEZONE text, + $RECORD_TYPE text, + $NEW_PERIOD boolean, + $NEW_APP boolean, + $DURATION DOUBLE PRECISION , + $WARNING text + ) + """.trimIndent() + + private val PARAMS_BINDING = linkedSetOf( + STUDY_ID, + PARTICIPANT_ID, + APP_LABEL, + APP_PACKAGE_NAME, + DATE_TIME_START, + DATE_TIME_END, + TIMEZONE, + RECORD_TYPE, + NEW_PERIOD, + NEW_APP, + DURATION, + WARNING + ).joinToString { "?" } + + /** + * PreparedStatement bind order + * 1) legacy study id + * 2) participant_id + * 3) app label + * 4) appPackageName + * 5) dateStart + * 6) dateEnd, + * 7) timezone + * 8) recordType + * 9) newPeriod + * 10) newApp + * 11) duration + * 12) warning + */ + private val INSERT_SQL = """ + INSERT INTO preprocessed_usage_events values ($PARAMS_BINDING) + """.trimIndent() + } + + init { + getHikariDataSource().connection.createStatement().use { statement -> + statement.execute(CREATE_TABLE_SQL) + } + } + + override fun upgrade(): Boolean { + + val participants = BasePostgresIterable( + PreparedStatementHolderSupplier( + getHikariDataSource(), + "select * from (select * from participants_export union select * from missed_participants_fresh) AS participants" + ) {} + ) { + participant(it) + }.groupBy { it.organization_id } + + val orgIds = (appConfigs.keys.filter { it.appId == DATA_COLLECTION_APP_ID }.map { it.organizationId } + LEGACY_ORG_ID).toSet() + val principals = getChronicleSuperUserPrincipals() + val invalidOrgIds = orgIds.filter { !participants.keys.contains(it) } + logger.info("Organizations not found. Skipping: ${invalidOrgIds.map { organizations[it] }}") + + val hds = getHikariDataSource() + (orgIds - invalidOrgIds.toSet()).forEach { orgId -> + exportEntities( + hds, + orgId, + participants.getValue(orgId).associateBy { participant -> participant.participant_ek_id }, + principals + ) + } + + return true + } + + override fun getSupportedVersion(): Long { + return Version.V2021_07_23.value + } + + private fun writeEntities(entities: List, hds: HikariDataSource): Int { + return hds.connection.use { connection -> + try { + val wc = connection.prepareStatement(INSERT_SQL).use { ps -> + entities.forEach { + var index = 0 + ps.setObject(++index, it.study_id) + ps.setString(++index, it.participant_id) + ps.setString(++index, it.appLabel) + ps.setString(++index, it.packageName) + ps.setObject(++index, it.datetimeStart) + ps.setObject(++index, it.datetimeEnd) + ps.setString(++index, it.timezone) + ps.setString(++index, it.recordType) + ps.setBoolean(++index, it.newPeriod) + ps.setBoolean(++index, it.newApp) + ps.setObject(++index, it.duration) + ps.setString(++index, it.warning) + ps.addBatch() + } + ps.executeBatch().sum() + } + return@use wc + } catch (ex: Exception) { + logger.error("exception", ex) + throw ex + } + } + } + + private fun getHikariDataSource(): HikariDataSource { + val (hikariConfiguration) = rhizomeConfiguration.datasourceConfigurations["chronicle"]!! + val hc = HikariConfig(hikariConfiguration) + return HikariDataSource(hc) + } + + private fun participant(rs: ResultSet): ParticipantExport { + return ParticipantExport( + participant_ek_id = rs.getObject("participant_ek_id", UUID::class.java), + legacy_study_id = rs.getObject("legacy_study_id", UUID::class.java), + legacy_participant_id = rs.getString("legacy_participant_id"), + organization_id = rs.getObject("organization_id", UUID::class.java) + ) + } + + private fun exportEntities( + hds: HikariDataSource, + orgId: UUID, + participants: Map, + principals: Set + ) { + logger.info("getting preprocessed data entities for org $orgId") + val orgEntitySetIds = getOrgEntitySetNames(orgId) + + val participantEntitySetIds = when (orgId) { + LEGACY_ORG_ID -> { + val entitySetNames = participants.values.map { it.legacy_study_id }.map { studyId -> "chronicle_participants_$studyId" } + entitySetIds.filter { entitySetNames.contains(it.key) }.values.toSet() + } + else -> setOf(orgEntitySetIds.getValue(PARTICIPANTS_ES)) + } + if (participants.isEmpty()) { + logger.info("No participants found. Skipping org") + return + } + + val allIds = participants.keys.toMutableSet() + while (allIds.isNotEmpty()) { + val current = allIds.take(20).toSet() + logger.info("processing batch of ${current.size}. Remaining: ${(allIds - current).size}") + val participantNeighbors: Map> = getParticipantNeighbors( + entityKeyIds = current, + entitySetIds = orgEntitySetIds, + participantEntitySetIds = participantEntitySetIds, + principals = principals + ) + val entities = participantNeighbors.mapValues { + it.value.map { entityDetails -> getEntity(entityDetails.neighborDetails.get(), it.key, participants) } + }.values.flatten().filter { it.study_id != null && it.participant_id != null} + + logger.info("retrieved ${entities.size} preprocessed entities") + val written = writeEntities(entities, hds) + logger.info("exported $written entities to table") + + allIds -= current + } + } + + private fun getEntity( + entity: Map>, + participant_ek_id: UUID, + participants: Map + ): PreProcessedEntity { + val participant = participants[participant_ek_id] + + return PreProcessedEntity( + study_id = participant?.legacy_study_id, + participant_id = participant?.legacy_participant_id, + appLabel = getFirstValueOrNull(entity, TITLE_FQN), + packageName = getFirstValueOrNull(entity, FULL_NAME_FQN), + datetimeStart = getFirstValueOrNull(entity, DATE_TIME_START_FQN)?.let { OffsetDateTime.parse(it) }, + datetimeEnd = getFirstValueOrNull(entity, DATE_TIME_END_FQN)?.let { OffsetDateTime.parse(it) }, + timezone = getFirstValueOrNull(entity, TIMEZONE_FQN), + recordType = getFirstValueOrNull(entity, RECORD_TYPE_FQN), + newPeriod = getFirstValueOrNull(entity, NEW_PERIOD_FQN).toBoolean(), + duration = getFirstValueOrNull(entity, DURATION_FQN)?.toDouble(), + warning = getFirstValueOrNull(entity, WARNING_FQN), + newApp = getFirstValueOrNull(entity, NEW_APP_FQN).toBoolean() + ) + } + + private fun getLegacyParticipantEntitySetIds(): Set { + return entitySetIds.filter { it.key.startsWith("chronicle_participants_") }.map { it.value }.toSet() + } + + private fun getParticipantNeighbors( + entityKeyIds: Set, + entitySetIds: Map, + participantEntitySetIds: Set, + principals: Set + ): Map> { + val preprocessedEntitySetId = entitySetIds.getValue(PRE_PROCESSED_ES) + val participatedInEntitySetId = entitySetIds.getValue(PARTICIPATED_IN_ES) + val recordedByEntitySetId = entitySetIds.getValue(RECORDED_BY_ES) + + val filter = EntityNeighborsFilter( + entityKeyIds, + Optional.of(setOf(preprocessedEntitySetId)), + Optional.empty(), + Optional.of(setOf(recordedByEntitySetId, participatedInEntitySetId)) + ) + return searchService.executeEntityNeighborSearch( + participantEntitySetIds, + PagedNeighborRequest(filter), + principals + ).neighbors + } + + + // Returns participants in an org + private fun getOrgParticipants( + participantEntitySetIds: Set + ): Set { + return dataQueryService.getEntitiesWithPropertyTypeFqns( + participantEntitySetIds.associateWith { Optional.empty() }, + entitySetService.getPropertyTypesOfEntitySets(participantEntitySetIds), + mapOf(), + setOf(), + Optional.empty(), + false + ).keys + } + + + private fun getFirstValueOrNull(entity: Map>, fqn: FullQualifiedName): String? { + entity[fqn]?.iterator()?.let { + if (it.hasNext()) return it.next().toString() + } + return null + } + + + private fun getOrgEntitySetNames(orgId: UUID): Map { + val entitySetNameByTemplateName = when (orgId) { + LEGACY_ORG_ID -> mapOf( + PRE_PROCESSED_ES to LEGACY_PREPROCESSED_ES, + PARTICIPATED_IN_ES to LEGACY_PARTICIPATED_IN_ES, + RECORDED_BY_ES to LEGACY_RECORDED_BY_ES + ) + else -> { + val orgIdToStr = orgId.toString().replace("-", "") + mapOf( + PRE_PROCESSED_ES to "$DATA_COLLECTION_APP_ES_PREFIX${orgIdToStr}_$PRE_PROCESSED_TEMPLATE", + RECORDED_BY_ES to "$DATA_COLLECTION_APP_ES_PREFIX${orgIdToStr}_$RECORDED_BY_TEMPLATE", + PARTICIPANTS_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_${PARTICIPANTS_TEMPLATE}", + PARTICIPATED_IN_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$PARTICIPATED_IN_TEMPLATE", + ) + } + } + + return entitySetNameByTemplateName.mapValues { entitySetIds.getValue(it.value) } + } + + private fun getChronicleSuperUserPrincipals(): Set { + val securablePrincipal = principalService.getSecurablePrincipal(SUPER_USER_PRINCIPAL_ID) + return principalService.getAllPrincipals(securablePrincipal).map { it.principal }.toSet() + Principal(PrincipalType.USER, SUPER_USER_PRINCIPAL_ID) + } +} + +data class PreProcessedEntity( + val study_id: UUID?, + val participant_id: String?, + val appLabel: String?, + val datetimeStart: OffsetDateTime?, + val datetimeEnd: OffsetDateTime?, + val packageName: String?, + val timezone: String?, + val recordType: String?, + val newPeriod: Boolean, + val newApp: Boolean, + val duration: Double?, + val warning: String? +) + +data class ParticipantExport( + val participant_ek_id: UUID, + val legacy_study_id: UUID, + val legacy_participant_id: String, + val organization_id: UUID +) \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateTimeUseDiarySubmissions.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateTimeUseDiarySubmissions.kt new file mode 100644 index 00000000..86e1cf56 --- /dev/null +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateTimeUseDiarySubmissions.kt @@ -0,0 +1,497 @@ +package com.openlattice.mechanic.upgrades + +import com.geekbeast.mappers.mappers.ObjectMappers +import com.geekbeast.rhizome.configuration.RhizomeConfiguration +import com.openlattice.authorization.Principal +import com.openlattice.authorization.PrincipalType +import com.openlattice.data.requests.NeighborEntityDetails +import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService +import com.openlattice.datastore.services.EntitySetManager +import com.openlattice.edm.EdmConstants +import com.openlattice.graph.PagedNeighborRequest +import com.openlattice.hazelcast.HazelcastMap +import com.openlattice.mechanic.Toolbox +import com.openlattice.organizations.roles.SecurePrincipalsManager +import com.openlattice.search.SearchService +import com.openlattice.search.requests.EntityNeighborsFilter +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.apache.olingo.commons.api.edm.FullQualifiedName +import org.slf4j.LoggerFactory +import java.time.OffsetDateTime +import java.util.* + +/** + * @author alfoncenzioka <alfonce@openlattice.com> + */ +class MigrateTimeUseDiarySubmissions( + val toolbox: Toolbox, + private val rhizomeConfiguration: RhizomeConfiguration, + private val dataQueryService: PostgresEntityDataQueryService, + private val entitySetService: EntitySetManager, + private val searchService: SearchService, + private val principalService: SecurePrincipalsManager, +) : Upgrade { + + private val entitySetIds: Map = HazelcastMap.ENTITY_SETS.getMap(toolbox.hazelcast).associate { it.value.name to it.key } + private val appConfigs = HazelcastMap.APP_CONFIGS.getMap(toolbox.hazelcast) + + var totalSubmissionEntities: Int = 0 //keep track of number of submission entities + + companion object { + private val logger = LoggerFactory.getLogger(MigrateTimeUseDiarySubmissions::class.java) + private val mapper = ObjectMappers.getJsonMapper() + + private const val SUPER_USER_PRINCIPAL_ID = "auth0|5ae9026c04eb0b243f1d2bb6" + + private val SURVEYS_APP_ID = UUID.fromString("bb44218b-515a-4314-b955-df2c991b2575") + + private const val CHRONICLE_APP_ES_PREFIX = "chronicle_" + private const val SURVEYS_APP_ES_PREFIX = "chronicle_surveys_" + + // collection template names + private const val STUDIES_TEMPLATE = "studies" + private const val PARTICIPATED_IN_TEMPLATE = "participatedin" + private const val PARTICIPANTS_TEMPLATE = "participants" + private const val SUBMISSION_TEMPLATE = "submission" + private const val QUESTION_TEMPLATE = "question" + private const val ANSWER_TEMPLATE = "answer" + private const val TIME_RANGE_TEMPLATE = "timerange" + private const val REGISTERED_FOR_TEMPLATE = "registeredfor" + private const val RESPONDS_WITH_TEMPLATE = "respondswith" + private const val ADDRESSES_TEMPLATE = "addresses" + + // entity sets lookup name + private const val STUDIES_ES = "studies" + private const val PARTICIPATED_IN_ES = "participatedIn" + private const val PARTICIPANTS_ES = "participants" + private const val TIME_RANGE_ES = "timeRange" + private const val SUBMISSION_ES = "submission" + private const val QUESTION_ES = "question" + private const val ANSWER_ES = "answer" + private const val REGISTERED_FOR_ES = "registeredFor" + private const val RESPONDED_WITH_ES = "respondedWith" + private const val ADDRESSES_ES = "addresses" + + private val OL_ID_FQN = EdmConstants.ID_FQN + private val STRING_ID_FQN = FullQualifiedName("general.stringid") + private val PERSON_FQN = FullQualifiedName("nc.SubjectIdentification") + private val TITLE_FQN = FullQualifiedName("ol.title") // question title + private val DATE_TIME_START_FQN = FullQualifiedName("ol.datetimestart") //timerange + private val DATE_TIME_END_FQN = FullQualifiedName("ol.datetimeend") + private val VALUES_FQN = FullQualifiedName("ol.values") + private val FULL_NAME_FQN = FullQualifiedName("general.fullname") + private val DATE_TIME_FQN = FullQualifiedName("ol.datetime") + private val ID_FQN = FullQualifiedName("ol.id") // question code + + // column names + private const val STUDY_ID = "study_id" + private const val PARTICIPANT_ID = "participant_id" + private const val ORGANIZATION_ID = "organization_id" + private const val SUBMISSION = "submission" + private const val SUBMISSION_ID = "submission_id" //not unique for each row + private const val SUBMISSION_DATE = "submission_date" + + private const val TABLE_NAME = "migrate_time_use_diary" + + private val COLUMNS = linkedSetOf( + STUDY_ID, + ORGANIZATION_ID, + PARTICIPANT_ID, + SUBMISSION_ID, + SUBMISSION, + SUBMISSION_DATE + ).joinToString { it } + + private val CREATE_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS $TABLE_NAME( + $STUDY_ID uuid not null, + $ORGANIZATION_ID uuid not null, + $SUBMISSION_ID uuid not null, + $PARTICIPANT_ID text not null, + $SUBMISSION jsonb not null, + $SUBMISSION_DATE timestamp with time zone not null, + PRIMARY KEY($SUBMISSION_ID, $SUBMISSION_DATE) + ) + """.trimIndent() + + /** + * PreparedStatement bind order + * 1) studyId, + * 2) orgId + * 3) participantId + * 4) submissionId + * 5) submission + * 6) submissionDate + */ + private val INSERT_INTO_TABLE_SQL = """ + INSERT INTO $TABLE_NAME ($COLUMNS) VALUES (?, ?, ?, ?, ?::jsonb, ?) + """.trimIndent() + } + + init { + getHikariDataSource().connection.createStatement().use { statement -> + statement.execute(CREATE_TABLE_SQL) + } + } + + private fun getHikariDataSource(): HikariDataSource { + val (hikariConfiguration) = rhizomeConfiguration.datasourceConfigurations["chronicle"]!! + val hc = HikariConfig(hikariConfiguration) + return HikariDataSource(hc) + } + + private fun getOrgEntitySetNames(orgId: UUID): Map { + val orgIdToStr = orgId.toString().replace("-", "") + val entitySetNameByTemplateName = mapOf( + STUDIES_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$STUDIES_TEMPLATE", + PARTICIPATED_IN_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$PARTICIPATED_IN_TEMPLATE", + PARTICIPANTS_ES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_${PARTICIPANTS_TEMPLATE}", + ADDRESSES_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${ADDRESSES_TEMPLATE}", + RESPONDED_WITH_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${RESPONDS_WITH_TEMPLATE}", + REGISTERED_FOR_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${REGISTERED_FOR_TEMPLATE}", + ANSWER_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${ANSWER_TEMPLATE}", + QUESTION_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${QUESTION_TEMPLATE}", + SUBMISSION_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${SUBMISSION_TEMPLATE}", + TIME_RANGE_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${TIME_RANGE_TEMPLATE}" + ) + + return entitySetNameByTemplateName.filter { entitySetIds.keys.contains(it.value) }.mapValues { entitySetIds.getValue(it.value) } + } + + private fun getChronicleSuperUserPrincipals(): Set { + val securablePrincipal = principalService.getSecurablePrincipal(SUPER_USER_PRINCIPAL_ID) + return principalService.getAllPrincipals(securablePrincipal).map { it.principal }.toSet() + Principal(PrincipalType.USER, SUPER_USER_PRINCIPAL_ID) + } + + private fun processOrganization(orgId: UUID, principals: Set): List { + logger.info("Processing org $orgId") + + + val orgEntitySetIds = getOrgEntitySetNames(orgId) + + // get all participants in studies + val participants: Set = getOrgParticipants( + entitySetIds = orgEntitySetIds, + ) + + if (participants.isEmpty()) { + logger.info("No participants found in org $orgId") + return listOf() + } + + // participant -> neighbor entity set id -> [neighbors] + val participantNeighbors: Map>> = getParticipantNeighbors( + entityKeyIds = participants.map { it.id }.toSet(), + entitySetIds = orgEntitySetIds, + principals = principals + ) + + val studiesByParticipantId: Map = participantNeighbors + .mapValues { it.value.getOrDefault(orgEntitySetIds.getValue(STUDIES_ES), listOf()).first() } + .mapValues { getStudyEntity(it.value.neighborId.get(), it.value.neighborDetails.get()) } + logger.info("Org studies: ${studiesByParticipantId.values.toSet()}") + + // unique submission. Each submission entity is an entry in the tud submissions table + val submissionsById = participantNeighbors + .values.map { it.getOrDefault(orgEntitySetIds.getValue(SUBMISSION_ES), listOf()) }.flatten().associateBy { it.neighborId.get() } + totalSubmissionEntities += submissionsById.keys.size + + if (submissionsById.isEmpty()) { + logger.info("no submissions found") + return listOf() + } + + val answersById = participantNeighbors + .values.map { it.getOrDefault(orgEntitySetIds.getValue(ANSWER_ES), listOf()) }.flatten().associateBy { it.neighborId.get() } + if (answersById.isEmpty()) { + logger.warn("unexpected. submission should have answer entities") + return listOf() + } + + // answerId -> neighbor esid -> [neighbors] + val answerNeighbors = getAnswerNeighbors( + entityKeyIds = answersById.keys, + entitySetIds = orgEntitySetIds, + principals = principals + ) + + // submissionId -> [answer] + val answersBySubmissionId = getAnswersBySubmissionId( + entityKeyIds = submissionsById.keys, + entitySetIds = orgEntitySetIds, + principals = principals + ) + + val participantBySubmissionId = participantNeighbors + .map { it.value.getOrDefault(orgEntitySetIds.getValue(SUBMISSION_ES), setOf()).associate { neighbor -> neighbor.neighborId.get() to it.key } } + .asSequence() + .flatMap { it.asSequence() } + .groupBy({ it.key }, { it.value }) + .mapValues { it.value.first() } + + return answersBySubmissionId.map { (submissionId, answerEntities) -> + getSubmissionEntity( + orgId = orgId, + submissionId = submissionId, + answerEntities = answerEntities, + participantsById = participants.associateBy { it.id }, + studiesByParticipantId = studiesByParticipantId, + participantBySubmissionId = participantBySubmissionId, + answerNeighbors = answerNeighbors.mapValues { answers -> answers.value.mapValues { neighbors -> neighbors.value.first() } }, + submissionEntity = submissionsById.getValue(submissionId).neighborDetails.get(), + entitySetIds = orgEntitySetIds + ) + } + } + + private fun getSubmissionEntity( + orgId: UUID, + submissionId: UUID, + answerEntities: List, + participantsById: Map, + studiesByParticipantId: Map, + participantBySubmissionId: Map, + answerNeighbors: Map>, + submissionEntity: Map>, + entitySetIds: Map + ): SubmissionEntity { + val dateSubmitted = getFirstValueOrNull(submissionEntity, DATE_TIME_FQN) + + val participantId = participantBySubmissionId.getValue(submissionId) + val participant = participantsById.getValue(participantId) + + val responses = answerEntities.map { answer -> + getResponse( + answerId = answer.neighborId.get(), + answerEntity = answer.neighborDetails.get(), + answerNeighbors = answerNeighbors, + entitySetIds = entitySetIds + ) + } + return SubmissionEntity( + orgId = orgId, + submissionId = submissionId, + date = dateSubmitted?.let { OffsetDateTime.parse(it) }, + studyId = studiesByParticipantId.getValue(participantId).studyId, + participantId = participant.participantId!!, //force unwrapping is safe because we have already filtered out "bad" participant entities + responses = responses.filter { it.code != null && it.question != null }.toSet() + ) + } + + private fun getResponse( + answerId: UUID, + answerEntity: Map>, + answerNeighbors: Map>, + entitySetIds: Map + ): ResponseEntity { + + val responses = getAllValuesOrNull(answerEntity, VALUES_FQN) + val questionEntity = answerNeighbors.getValue(answerId).getValue(entitySetIds.getValue(QUESTION_ES)) + // time range is optional + val timeRangeEntity = answerNeighbors.getValue(answerId)[entitySetIds.getValue(TIME_RANGE_ES)] + val startDateTime = timeRangeEntity?.let { getFirstValueOrNull(it.neighborDetails.get(), DATE_TIME_START_FQN) } + val endDateTime = timeRangeEntity?.let { getFirstValueOrNull(it.neighborDetails.get(), DATE_TIME_END_FQN) } + + + return ResponseEntity( + code = getFirstValueOrNull(questionEntity.neighborDetails.get(), ID_FQN), + question = getFirstValueOrNull(questionEntity.neighborDetails.get(), TITLE_FQN), + response = responses, + startDateTime = startDateTime?.let { OffsetDateTime.parse(it) }, + endDateTime = endDateTime?.let { OffsetDateTime.parse(it) } + ) + } + + private fun getAnswersBySubmissionId( + entityKeyIds: Set, + entitySetIds: Map, + principals: Set + ): Map> { + val registeredForEntitySetId = entitySetIds.getValue(REGISTERED_FOR_ES) + val answerEntitySetId = entitySetIds.getValue(ANSWER_ES) + + val filter = EntityNeighborsFilter( + entityKeyIds, + Optional.of(setOf(answerEntitySetId)), + Optional.empty(), + Optional.of(setOf(registeredForEntitySetId)) + ) + + return searchService.executeEntityNeighborSearch( + setOf(entitySetIds.getValue(SUBMISSION_ES)), + PagedNeighborRequest(filter), + principals + ).neighbors + } + + private fun getAnswerNeighbors( + entityKeyIds: Set, + entitySetIds: Map, + principals: Set + ): Map>> { + val registeredForEntitySetId = entitySetIds.getValue(REGISTERED_FOR_ES) + val submissionEntitySetId = entitySetIds.getValue(SUBMISSION_ES) + val timeRangeEntitySetId = entitySetIds.getValue(TIME_RANGE_ES) + val questionEntitySetId = entitySetIds.getValue(QUESTION_ES) + val addressesEntitySetId = entitySetIds.getValue(ADDRESSES_ES) + + val filter = EntityNeighborsFilter( + entityKeyIds, + Optional.empty(), + Optional.of(setOf(submissionEntitySetId, timeRangeEntitySetId, questionEntitySetId)), + Optional.of(setOf(registeredForEntitySetId, addressesEntitySetId)) + ) + return searchService.executeEntityNeighborSearch( + setOf(entitySetIds.getValue(ANSWER_ES)), + PagedNeighborRequest(filter), + principals + ).neighbors.mapValues { it.value.groupBy { neighbors -> neighbors.neighborEntitySet.get().id } } + } + + private fun getParticipantNeighbors( + entityKeyIds: Set, + entitySetIds: Map, + principals: Set + ): Map>> { + val submissionEntitySetId = entitySetIds.getValue(SUBMISSION_ES) + val respondsWithEntitySetId = entitySetIds.getValue(RESPONDED_WITH_ES) + val answerEntitySetId = entitySetIds.getValue(ANSWER_ES) + val participatedInEntitySetId = entitySetIds.getValue(PARTICIPATED_IN_ES) + val studiesEntitySetId = entitySetIds.getValue(STUDIES_ES) + + val filter = EntityNeighborsFilter( + entityKeyIds, + Optional.empty(), + Optional.of(setOf(submissionEntitySetId, answerEntitySetId, studiesEntitySetId)), + Optional.of(setOf(respondsWithEntitySetId, participatedInEntitySetId)) + ) + return searchService.executeEntityNeighborSearch( + setOf(entitySetIds.getValue(PARTICIPANTS_ES)), + PagedNeighborRequest(filter), + principals + ).neighbors.mapValues { it.value.groupBy { neighbors -> neighbors.neighborEntitySet.get().id } } + } + + private fun getOrgStudies(entitySetId: UUID): Map { + return dataQueryService.getEntitiesWithPropertyTypeFqns( + mapOf(entitySetId to Optional.empty()), + entitySetService.getPropertyTypesOfEntitySets(setOf(entitySetId)), + mapOf(), + setOf(), + Optional.empty(), + false + ) + .filter { getFirstUUIDOrNull(it.value, STRING_ID_FQN) != null } + .mapValues { getStudyEntity(it.key, it.value) } + } + + private fun getStudyEntity(studyEntityKeyId: UUID, entity: Map>): Study { + val title = getFirstValueOrNull(entity, FULL_NAME_FQN) + val studyId = getFirstUUIDOrNull(entity, STRING_ID_FQN) + return Study(studyEntityKeyId, studyId!!, title = title) + } + + // Returns a mapping from studyEntityKeyId to list of participants + private fun getOrgParticipants( + entitySetIds: Map, + ): Set { + return dataQueryService.getEntitiesWithPropertyTypeFqns( + mapOf(entitySetIds.getValue(PARTICIPANTS_ES) to Optional.empty()), + entitySetService.getPropertyTypesOfEntitySets(setOf(entitySetIds.getValue(PARTICIPANTS_ES))), + mapOf(), + setOf(), + Optional.empty(), + false + ).mapValues { getParticipantEntity(it.key, it.value) }.values.filter { it.participantId != null }.toSet() + + } + + private fun getParticipantEntity(entityKeyId: UUID, entity: Map>): Participant { + val participantId = getFirstValueOrNull(entity, PERSON_FQN) + return Participant( + id = entityKeyId, + participantId = participantId, + studyEntityKeyId = entityKeyId + ) + } + + private fun getFirstUUIDOrNull(entity: Map>, fqn: FullQualifiedName): UUID? { + return when (val string = getFirstValueOrNull(entity, fqn)) { + null -> null + else -> UUID.fromString(string) + } + } + + private fun getAllValuesOrNull(entity: Map>, fqn: FullQualifiedName): Set { + entity[fqn]?.let { it -> + return it.mapNotNull { it.toString() }.toSet() + } + return setOf() + } + + private fun getFirstValueOrNull(entity: Map>, fqn: FullQualifiedName): String? { + entity[fqn]?.iterator()?.let { + if (it.hasNext()) return it.next().toString() + } + return null + } + + private fun writeEntitiesToTable(entities: List): Int { + val hds = getHikariDataSource() + + return hds.connection.use { connection -> + try { + val wc = connection.prepareStatement(INSERT_INTO_TABLE_SQL).use { ps -> + entities.forEach { + var index = 0 + ps.setObject(++index, it.studyId) + ps.setObject(++index, it.orgId) + ps.setString(++index, it.participantId) + ps.setObject(++index, it.submissionId) + ps.setString(++index, mapper.writeValueAsString(it.responses)) + ps.setObject(++index, it.date) + ps.addBatch() + } + ps.executeBatch().sum() + } + return@use wc + } catch (ex: Exception) { + throw ex + } + + } + } + + override fun upgrade(): Boolean { + val superUserPrincipals = getChronicleSuperUserPrincipals() + + val orgIds = appConfigs.keys.filter { it.appId == SURVEYS_APP_ID }.map { it.organizationId }.toSet() + val entities = orgIds.map { processOrganization(it, superUserPrincipals) }.flatten() + val written = writeEntitiesToTable(entities) + logger.info("Exported $written entities to $TABLE_NAME") + logger.info("Actual number of entities found in all submission entity sets: $totalSubmissionEntities") + return true + } + + override fun getSupportedVersion(): Long { + return Version.V2021_07_23.value + } +} + +private data class ResponseEntity( + val code: String?, + val question: String?, + val response: Set, + val startDateTime: OffsetDateTime?, + val endDateTime: OffsetDateTime? +) + +private data class SubmissionEntity( + val orgId: UUID, + val studyId: UUID, + val submissionId: UUID, + val date: OffsetDateTime?, + val participantId: String, + val responses: Set +) \ No newline at end of file diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateTimeUseDiarySummarizedData.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateTimeUseDiarySummarizedData.kt new file mode 100644 index 00000000..d30cada5 --- /dev/null +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/MigrateTimeUseDiarySummarizedData.kt @@ -0,0 +1,208 @@ +package com.openlattice.mechanic.upgrades + +import com.geekbeast.mappers.mappers.ObjectMappers +import com.geekbeast.rhizome.configuration.RhizomeConfiguration +import com.openlattice.authorization.Principal +import com.openlattice.authorization.PrincipalType +import com.openlattice.data.requests.NeighborEntityDetails +import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService +import com.openlattice.datastore.services.EntitySetManager +import com.openlattice.graph.PagedNeighborRequest +import com.openlattice.hazelcast.HazelcastMap +import com.openlattice.mechanic.Toolbox +import com.openlattice.organizations.roles.SecurePrincipalsManager +import com.openlattice.search.SearchService +import com.openlattice.search.requests.EntityNeighborsFilter +import com.zaxxer.hikari.HikariConfig +import com.zaxxer.hikari.HikariDataSource +import org.apache.olingo.commons.api.edm.FullQualifiedName +import org.slf4j.LoggerFactory +import java.util.* + +/** + * @author alfoncenzioka <alfonce@openlattice.com> + */ +class MigrateTimeUseDiarySummarizedData( + toolbox: Toolbox, + private val rhizomeConfiguration: RhizomeConfiguration, + private val principalService: SecurePrincipalsManager, + private val searchService: SearchService, + private val dataQueryService: PostgresEntityDataQueryService, + private val entitySetService: EntitySetManager +) : Upgrade{ + private val entitySetIds: Map = HazelcastMap.ENTITY_SETS.getMap(toolbox.hazelcast).associate { it.value.name to it.key } + private val appConfigs = HazelcastMap.APP_CONFIGS.getMap(toolbox.hazelcast) + + companion object { + private val logger = LoggerFactory.getLogger(MigrateTimeUseDiarySummarizedData::class.java) + private val mapper = ObjectMappers.getJsonMapper() + + private const val SUPER_USER_PRINCIPAL_ID = "auth0|5ae9026c04eb0b243f1d2bb6" + private val SURVEYS_APP_ID = UUID.fromString("bb44218b-515a-4314-b955-df2c991b2575") + + private const val SURVEYS_APP_ES_PREFIX = "chronicle_surveys_" + + private const val SUBMISSION_TEMPLATE = "submission" + private const val SUMMARY_TEMPLATE = "summaryset" + private const val REGISTERED_FOR_TEMPLATE = "registeredfor" + + private const val SUBMISSION_ES = "submission" + private const val SUMMARY_ES = "summary" + private const val REGISTERED_FOR_ES = "registeredFor" + + private val VARIABLE_FQN = FullQualifiedName("ol.variable") + private val VALUES_FQN = FullQualifiedName("ol.values") + + private const val TABLE_NAME = "migrate_time_use_diary_summary" + + private val CREATE_TABLE_SQL = """ + CREATE TABLE IF NOT EXISTS $TABLE_NAME( + submission_id uuid not null, + data jsonb not null, + PRIMARY KEY (submission_id) + ) + """.trimIndent() + + private val INSERT_INTO_TABLE_SQL = """ + INSERT INTO $TABLE_NAME values(?, ?::jsonb) ON CONFLICT DO NOTHING + """.trimIndent() + } + + + init { + getHikariDataSource().connection.createStatement().use { statement -> + statement.execute(CREATE_TABLE_SQL) + } + } + + override fun upgrade(): Boolean { + val orgIds = appConfigs.keys.filter { it.appId == SURVEYS_APP_ID }.map { it.organizationId }.toSet() + val principals = getChronicleSuperUserPrincipals() + val entities = orgIds.map { getEntitiesForOrg(it, principals) }.flatten().toSet() + + val written = writeEntitiesToTable(entities) + logger.info("Wrote $written entities to table") + return true + } + + override fun getSupportedVersion(): Long { + return Version.V2021_07_23.value + } + + private fun writeEntitiesToTable(entities: Set): Int { + val hds = getHikariDataSource() + return hds.connection.use { connection -> + val wc = connection.prepareStatement(INSERT_INTO_TABLE_SQL).use { ps -> + entities.forEach { + ps.setObject(1, it.submissionId) + ps.setString(2, mapper.writeValueAsString(it.entities)) + ps.addBatch() + } + ps.executeBatch().sum() + } + return@use wc + } + } + + + private fun getEntitiesForOrg(orgId: UUID, principals: Set): Set { + logger.info("processing org $orgId") + val entitySets = getOrgEntitySetNames(orgId) + logger.info("entity sets: $entitySets") + + + val submissionEntitySetId = entitySets[SUBMISSION_ES] + val registeredForEntitySetId = entitySets[REGISTERED_FOR_ES] + val summaryEntitySetId = entitySets[SUMMARY_ES] + + if (submissionEntitySetId == null || registeredForEntitySetId == null || summaryEntitySetId == null) { + logger.info("submission: {}, registered_for: {}, summary: {}", submissionEntitySetId, registeredForEntitySetId, summaryEntitySetId) + return setOf() + } + + val submissionIds = dataQueryService.getEntitiesWithPropertyTypeFqns( + mapOf(submissionEntitySetId to Optional.empty()), + entitySetService.getPropertyTypesOfEntitySets(setOf(submissionEntitySetId)), + mapOf(), + setOf(), + Optional.empty(), + false + ).keys + + if (submissionIds.isEmpty()) { + logger.info("no submission entities found") + return setOf() + } + + // get entities from summarized entity set associated with submission ids + val filter = EntityNeighborsFilter( + submissionIds, + Optional.of(setOf(summaryEntitySetId)), + Optional.empty(), + Optional.of(setOf(registeredForEntitySetId)) + ) + val searchResult = searchService.executeEntityNeighborSearch( + setOf(submissionEntitySetId), + PagedNeighborRequest(filter), + principals + ).neighbors.mapValues { getSummaryEntityForSubmission(it.key, it.value) } + + return searchResult.values.toSet() + } + + private fun getFirstValueOrNull(entity: Map>, fqn: FullQualifiedName): String? { + entity[fqn]?.iterator()?.let { + if (it.hasNext()) return it.next().toString() + } + return null + } + + private fun getSummaryEntityForSubmission(submissionId: UUID, neighbors: List): SummarizedDataSubmissionEntity { + val values = neighbors.map { + val entity = it.neighborDetails.get() + SummarizedEntity( + variable = getFirstValueOrNull(entity, VARIABLE_FQN), + value = getFirstValueOrNull(entity, VALUES_FQN) + ) + }.filter { it.value != null && it.variable != null }.toSet() + + return SummarizedDataSubmissionEntity( + submissionId = submissionId, + entities = values, + ) + } + + private fun getOrgEntitySetNames(orgId: UUID): Map { + val orgIdToStr = orgId.toString().replace("-", "") + val entitySetNameByTemplateName = mapOf( + REGISTERED_FOR_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${REGISTERED_FOR_TEMPLATE}", + SUBMISSION_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${SUBMISSION_TEMPLATE}", + SUMMARY_ES to "$SURVEYS_APP_ES_PREFIX${orgIdToStr}_${SUMMARY_TEMPLATE}" + ) + + return entitySetNameByTemplateName.filter { entitySetIds.keys.contains(it.value) }.mapValues { entitySetIds.getValue(it.value) } + } + + + private fun getHikariDataSource(): HikariDataSource { + val (hikariConfiguration) = rhizomeConfiguration.datasourceConfigurations["chronicle"]!! + val hc = HikariConfig(hikariConfiguration) + return HikariDataSource(hc) + } + + private fun getChronicleSuperUserPrincipals(): Set { + val securablePrincipal = principalService.getSecurablePrincipal(SUPER_USER_PRINCIPAL_ID) + return principalService.getAllPrincipals(securablePrincipal).map { it.principal }.toSet() + Principal(PrincipalType.USER, SUPER_USER_PRINCIPAL_ID) + } + +} + +private data class SummarizedEntity( + val variable: String?, + val value: String? +) + +private data class SummarizedDataSubmissionEntity( + val submissionId: UUID, + val entities: Set, +) diff --git a/src/main/kotlin/com/openlattice/mechanic/upgrades/V3StudyMigrationUpgrade.kt b/src/main/kotlin/com/openlattice/mechanic/upgrades/V3StudyMigrationUpgrade.kt index 2b22c43d..789df70e 100644 --- a/src/main/kotlin/com/openlattice/mechanic/upgrades/V3StudyMigrationUpgrade.kt +++ b/src/main/kotlin/com/openlattice/mechanic/upgrades/V3StudyMigrationUpgrade.kt @@ -2,6 +2,8 @@ package com.openlattice.mechanic.upgrades import com.geekbeast.rhizome.configuration.RhizomeConfiguration import com.hazelcast.query.Predicates +import com.openlattice.authorization.Principal +import com.openlattice.authorization.PrincipalType import com.openlattice.data.storage.MetadataOption import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService import com.openlattice.edm.EdmConstants.Companion.LAST_WRITE_FQN @@ -32,6 +34,7 @@ class V3StudyMigrationUpgrade( private val searchService: SearchService ): Upgrade { + private val SUPER_USER_PRINCIPAL_ID = "auth0|5ae9026c04eb0b243f1d2bb6" private val logger = LoggerFactory.getLogger(V3StudyMigrationUpgrade::class.java) private val propertyTypes = HazelcastMap.PROPERTY_TYPES.getMap(toolbox.hazelcast) @@ -254,8 +257,8 @@ class V3StudyMigrationUpgrade( val filter = EntityNeighborsFilter(setOf(studyEkid), Optional.of(orgMaybeParticipantEntitySetIds), Optional.of(orgStudyEntitySetIds), Optional.empty()) - val chronicleSuperUserSecurablePrincipal = principalService.getSecurablePrincipal("") - val chronicleSuperUserPrincipals = principalService.getAllPrincipals(chronicleSuperUserSecurablePrincipal).map { it.principal }.toSet() + val chronicleSuperUserSecurablePrincipal = principalService.getSecurablePrincipal(SUPER_USER_PRINCIPAL_ID) + val chronicleSuperUserPrincipals = principalService.getAllPrincipals(chronicleSuperUserSecurablePrincipal).map { it.principal }.toSet() + Principal(PrincipalType.USER, SUPER_USER_PRINCIPAL_ID) logger.info("chronicle super user principals ${chronicleSuperUserPrincipals.size} $chronicleSuperUserPrincipals") // get all participants for the study