-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
migrate app usage survey data to v3 #162
Open
anzioka
wants to merge
107
commits into
develop
Choose a base branch
from
task/LATTICE-3024-migrate-app-usage-survey
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 3 commits
Commits
Show all changes
107 commits
Select commit
Hold shift + click to select a range
1324a47
migrate app usage survey data to v3
anzioka 1b51a55
remove unused imports
anzioka 0357f19
rename
anzioka 6c99baa
use chronicle super user principals for data query
anzioka d458003
create table if not exists
anzioka d451961
remove user ids
anzioka 83637c5
fix gradle/aws
anzioka d4eecf0
super user ids
anzioka ebbc76a
Merge branch 'develop' into task/LATTICE-3024-migrate-app-usage-survey
anzioka 8ff714d
use principalService
anzioka 720ccbf
fix user id
anzioka 687cd35
fix gradle
anzioka 59bd08d
undo
anzioka 27f5cca
filter out nulls
anzioka e90c741
check authorization
anzioka b9b50e0
handle case where there multiple ol.title values
anzioka 09838eb
write to chronicle
anzioka bb04632
mechanic job to export participant stats
anzioka dcbb677
search serviceuse principals with admin role
anzioka 2b64e5c
guard against nulls
anzioka 6f83c87
fix NPE
anzioka 2613d7c
check for duplicates
anzioka e7fb2c5
assert
anzioka 506200b
remove unneeded
anzioka 689ff03
format
anzioka 2ace5e0
assert
anzioka 13fa484
fix stuff
anzioka ee4efd5
check for duplicates
anzioka 0c2dd18
use set
anzioka 9d9fdf2
change col name
anzioka bf8f630
log stuff
anzioka 1a261b7
permission stuff
anzioka bad2f0e
resolve duplicates
anzioka 32caaca
super user
anzioka 33d5aa2
unnecessary filtering
anzioka 8d545aa
super user principal
anzioka 21c3830
migrate org settings to studies
anzioka 1db1937
use chronicle datasource
anzioka ee3ef19
remove unused
anzioka 196364c
remove unnecessary
anzioka 2e65736
use principal type user
anzioka f8967e6
optional might not exist
anzioka 0c23f65
clean up stuff
anzioka 4f63aef
fix sql
anzioka 6810be1
fix sql
anzioka 17b59e3
study id might be null
anzioka 5b169f5
store legacy study id
anzioka 6d22998
write study ekid
anzioka aa0074b
fix create table sql
anzioka 83cf4ff
change data type
anzioka 1f6209a
rename
anzioka d3dd320
fix insert
anzioka 7cd62de
fix table definition
anzioka 6147bc1
migrate time use diary submisssions
anzioka 95865df
add bean to pod
anzioka d4e5828
fix
anzioka 6f3ec4b
add time use diary component
anzioka de04d28
duzz
anzioka d694fcc
Merge branch 'develop' into task/migrate-tud-submissions-v2
anzioka 1aa8dc4
export summarized time use diary data
anzioka 010962a
add to pod
anzioka ab14af5
add logging
anzioka 7d79cc5
fix
anzioka e057a62
log dates
anzioka a48e61f
missing fqn from association entity
anzioka 2e5431f
check for nulls
anzioka b9445bd
fix filtered search
anzioka 4abdb82
fix sql
anzioka f39d4cf
import preprocessed data
anzioka 5ec9461
fix sql
anzioka fba0e09
fix
anzioka a4cfe94
type double
anzioka c688aa2
nullable
anzioka ee58cf1
clean up
anzioka 9e27d88
whattt
anzioka 14efcff
more npe
anzioka 9c6d73b
fix
anzioka d58694a
filter out nulls
anzioka 8bd24b7
Merge branch 'develop' into task/LATTICE-3024-migrate-app-usage-survey
anzioka 0c4f398
Merge branch 'task/migrate-tud-summarized-data' into task/LATTICE-302…
anzioka f194a4e
Merge branch 'task/LATTICE-3061-export-org-settings-to-v3_studies' in…
anzioka a1c4d4e
rename
anzioka 5540987
Merge branch 'task/LATTICE-3072-migrate-participant-stats' into task/…
anzioka 677cee8
Merge branch 'task/migrate-tud-submissions-v2' into task/LATTICE-3024…
anzioka c41e97a
fix conflicts
anzioka 89998ee
rename
anzioka fbba417
rename tables
anzioka da96e63
table DNE
anzioka 62a4c20
write users to chronicle
anzioka 2f90771
chronicle super user principal
anzioka 1f2fa8d
conflict
anzioka 3ad03cf
batch processing
anzioka a8e618a
log
anzioka 5964fa0
batch size
anzioka 33fbd31
log
anzioka c4f9bd8
log
anzioka 535ffbd
filter out invalid configs
anzioka 7c96721
invalid stuff
anzioka e9dd402
log
anzioka 5bafeb6
dont hold stuff in memory
anzioka d81dc8a
add batch
anzioka 43a15a6
close connection
anzioka b7a66a6
pass hds
anzioka 4f63272
Merge branch 'task/migrate-preprocessed-data' into task/LATTICE-3024-…
anzioka dffe874
select all participants
anzioka ccc0f97
filter out null stuff
anzioka c282d7c
fix
anzioka File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
332 changes: 332 additions & 0 deletions
332
src/main/kotlin/com/openlattice/mechanic/upgrades/V3AppUsageSurveyMigration.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,332 @@ | ||
package com.openlattice.mechanic.upgrades | ||
|
||
import com.geekbeast.postgres.PostgresArrays | ||
import com.openlattice.authorization.Principal | ||
import com.openlattice.data.requests.NeighborEntityDetails | ||
import com.openlattice.data.storage.MetadataOption | ||
import com.openlattice.data.storage.postgres.PostgresEntityDataQueryService | ||
import com.openlattice.datastore.services.EntitySetManager | ||
import com.openlattice.edm.EdmConstants | ||
import com.openlattice.edm.EdmConstants.Companion.LAST_WRITE_FQN | ||
import com.openlattice.graph.PagedNeighborRequest | ||
import com.openlattice.hazelcast.HazelcastMap | ||
import com.openlattice.mechanic.Toolbox | ||
import com.openlattice.organizations.roles.SecurePrincipalsManager | ||
import com.openlattice.search.SearchService | ||
import com.openlattice.search.requests.EntityNeighborsFilter | ||
import com.zaxxer.hikari.HikariDataSource | ||
import org.apache.olingo.commons.api.edm.FullQualifiedName | ||
import org.slf4j.LoggerFactory | ||
import java.time.OffsetDateTime | ||
import java.util.* | ||
|
||
/** | ||
* @author alfoncenzioka <[email protected]> | ||
*/ | ||
class V3AppUsageSurveyMigration( | ||
toolbox: Toolbox, | ||
private val hds: HikariDataSource, | ||
private val principalService: SecurePrincipalsManager, | ||
private val searchService: SearchService, | ||
private val dataQueryService: PostgresEntityDataQueryService, | ||
private val entitySetService: EntitySetManager | ||
) : Upgrade { | ||
private val logger = LoggerFactory.getLogger(V3AppUsageSurveyMigration::class.java) | ||
|
||
private val organizations = HazelcastMap.ORGANIZATIONS.getMap(toolbox.hazelcast) | ||
private val appConfigs = HazelcastMap.APP_CONFIGS.getMap(toolbox.hazelcast) | ||
private val entitySetIds = HazelcastMap.ENTITY_SETS.getMap(toolbox.hazelcast).values.associateBy { it.name } | ||
|
||
private val allStudyIdsByParticipantEKID: MutableMap<UUID, UUID> = mutableMapOf() | ||
private val allParticipantIdsByEKID: MutableMap<UUID, String?> = mutableMapOf() | ||
private val allSurveyDataByParticipantEKID: MutableMap<UUID, List<Map<FullQualifiedName, Set<Any?>>>> = mutableMapOf() | ||
|
||
companion object { | ||
private val DATA_COLLECTION_APP_ID = UUID.fromString("c4e6d8fd-daf9-41e7-8c59-2a12c7ee0857") | ||
|
||
private val LEGACY_ORG_ID = UUID.fromString("7349c446-2acc-4d14-b2a9-a13be39cff93") | ||
|
||
private const val CHRONICLE_APP_ES_PREFIX = "chronicle_" | ||
private const val DATA_COLLECTION_APP_PREFIX = "chronicle_data_collection_" | ||
private const val USER_APPS = "userapps" | ||
private const val USED_BY = "usedby" | ||
private const val STUDIES = "studies" | ||
private const val PARTICIPANTS = "participants" | ||
private const val PARTICIPATED_IN = "participatedin" | ||
|
||
private const val LEGACY_USER_APPS_ES = "chronicle_user_apps" | ||
private const val LEGACY_STUDIES_ES = "chronicle_study" | ||
private const val LEGACY_PARTICIPATED_IN_ES = "chronicle_participated_in" | ||
private const val LEGACY_USED_BY_ES = "chronicle_used_by" | ||
|
||
private val OL_ID_FQN = EdmConstants.ID_FQN | ||
private val STRING_ID_FQN = FullQualifiedName("general.stringid") | ||
private val PERSON_FQN = FullQualifiedName("nc.SubjectIdentification") | ||
private val USER_FQN = FullQualifiedName("ol.user") | ||
private val FULL_NAME_FQN = FullQualifiedName("general.fullname") | ||
private val DATETIME_FQN = FullQualifiedName("ol.datetime") | ||
private val TIMEZONE_FQN = FullQualifiedName("ol.timezone") | ||
private val TITLE_FQN = FullQualifiedName("ol.title") | ||
|
||
private val FQN_TO_COLUMNS = mapOf( | ||
LAST_WRITE_FQN to "submission_date", | ||
TITLE_FQN to "application_label", | ||
FULL_NAME_FQN to "app_package_name", | ||
DATETIME_FQN to "event_timestamp", | ||
TIMEZONE_FQN to "timezone", | ||
USER_FQN to "users" | ||
) | ||
|
||
private val column_names = listOf("study_id", "participant_id") + FQN_TO_COLUMNS.values.toList() | ||
private val APP_USAGE_SURVEY_COLUMNS = column_names.joinToString(",") { it } | ||
private val APP_USAGE_SURVEY_PARAMS = column_names.joinToString(",") { "?" } | ||
|
||
/** | ||
* PreparedStatement bind order | ||
* 1) studyId | ||
* 2) participantId | ||
* 3) submissionDate, | ||
* 4) appLabel | ||
* 5) packageName | ||
* 6) timestamp | ||
* 7) timezone | ||
* 8) users | ||
*/ | ||
private val INSERT_INTO_APP_USAGE_SQL = """ | ||
INSERT INTO app_usage_survey($APP_USAGE_SURVEY_COLUMNS) values ($APP_USAGE_SURVEY_PARAMS) | ||
ON CONFLICT DO NOTHING | ||
""".trimIndent() | ||
|
||
} | ||
|
||
override fun upgrade(): Boolean { | ||
getAllAppUsageSurveyData() | ||
|
||
logger.info("migrating app usage surveys to v3") | ||
|
||
try { | ||
val written = hds.connection.use { connection -> | ||
connection.prepareStatement(INSERT_INTO_APP_USAGE_SQL).use { ps -> | ||
allSurveyDataByParticipantEKID.forEach { (key, data) -> | ||
val studyId = allStudyIdsByParticipantEKID[key] | ||
val participantId = allParticipantIdsByEKID[key] | ||
|
||
if (studyId == null || participantId == null) { | ||
logger.warn("Skipping migration for participant $key. failed to retrieve associated studyId or participantId") | ||
return@forEach | ||
} | ||
|
||
ps.setObject(1, studyId) | ||
ps.setString(2, participantId) | ||
|
||
data.forEach data@{ entity -> | ||
val users = entity.getOrDefault(USER_FQN, listOf()) | ||
.map { it.toString() }.filter { it.isNotBlank() }.toList() | ||
|
||
if (users.isEmpty() || users.first().toString().isBlank()) { | ||
return@data | ||
} | ||
|
||
val submissionDate = getFirstValueOrNull(entity, LAST_WRITE_FQN) | ||
val applicationLabel = getFirstValueOrNull(entity, TITLE_FQN) | ||
val appPackageName = getFirstValueOrNull(entity, FULL_NAME_FQN) | ||
val timezone = getFirstValueOrNull(entity, TIMEZONE_FQN) | ||
val timestamp = getFirstValueOrNull(entity, DATETIME_FQN) | ||
|
||
if (submissionDate == null || appPackageName == null || timestamp == null) { | ||
return@data | ||
} | ||
|
||
ps.setObject(3, OffsetDateTime.parse(submissionDate)) | ||
ps.setString(4, applicationLabel) | ||
ps.setString(5, appPackageName) | ||
ps.setObject(6, OffsetDateTime.parse(timestamp)) | ||
ps.setString(7, timezone) | ||
ps.setArray(8, PostgresArrays.createTextArray(connection, users)) | ||
ps.addBatch() | ||
} | ||
} | ||
ps.executeBatch().sum() | ||
} | ||
} | ||
|
||
logger.info("wrote {} entities to db. data query service returned {} entities", written, allSurveyDataByParticipantEKID.values.flatten().size) | ||
return true | ||
} catch (ex: Exception) { | ||
logger.error("error migrating app usage survey data to v3", ex) | ||
return false | ||
} | ||
} | ||
|
||
private fun getAllAppUsageSurveyData() { | ||
val v2DataCollectionOrgIds = appConfigs.keys | ||
.filter { it.appId == DATA_COLLECTION_APP_ID } | ||
.map { it.organizationId } | ||
.toMutableSet() | ||
|
||
(v2DataCollectionOrgIds + LEGACY_ORG_ID).forEach org@{ orgId -> | ||
logger.info("======================================") | ||
logger.info("getting app usage survey data for org $orgId") | ||
logger.info("=====================================") | ||
|
||
// entity sets | ||
val orgEntitySetIds = getOrgEntitySetIds(orgId) | ||
|
||
val studiesEntitySetId = orgEntitySetIds.getValue(STUDIES) | ||
val participatedInEntitySetId = orgEntitySetIds.getValue(PARTICIPATED_IN) | ||
val participantsEntitySetId = orgEntitySetIds[PARTICIPANTS] // this will be null if org is legacy | ||
val userAppsEntitySetId = orgEntitySetIds.getValue(USER_APPS) | ||
val usedByEntitySetId = orgEntitySetIds.getValue(USED_BY) | ||
|
||
val adminRoleAclKey = organizations[orgId]?.adminRoleAclKey | ||
if (adminRoleAclKey == null) { | ||
logger.warn("skipping {} since it doesn't have admin role", orgId) | ||
return@org | ||
} | ||
val adminPrincipals = principalService.getAllUsersWithPrincipal(adminRoleAclKey).map { it.principal }.toSet() | ||
|
||
val studies = filterInvalidStudies(getEntitiesByEntityKeyId(studiesEntitySetId)) | ||
if (studies.isEmpty()) { | ||
logger.info("org {} has no studies. skipping", orgId) | ||
return@org | ||
} | ||
logger.info("found {} studies in org {}", studies.size, orgId) | ||
|
||
val studyEntityKeyIds = studies.keys | ||
val studyIds = studies.map { getFirstUUIDOrNull(it.value, STRING_ID_FQN) }.filterNotNull().toSet() | ||
|
||
val participantEntitySetIds = when (orgId) { | ||
LEGACY_ORG_ID -> getLegacyParticipantEntitySetIds(studyIds) | ||
else -> setOf(participantsEntitySetId!!) | ||
} | ||
val participants = getStudyParticipants(studiesEntitySetId, studyEntityKeyIds, participantEntitySetIds, participatedInEntitySetId, adminPrincipals) | ||
|
||
val participantsByStudyId = participants.mapValues { (_, neighbor) -> neighbor.map { it.neighborId.get() }.toSet() } | ||
logger.info("org {} participant count by study {}", orgId, participantsByStudyId.mapValues { it.value.size }) | ||
|
||
val participantIdByEKID = | ||
participants.values | ||
.flatten() | ||
.associate { getFirstUUIDOrNull(it.neighborDetails.get(), OL_ID_FQN)!! to getFirstValueOrNull(it.neighborDetails.get(), PERSON_FQN) } | ||
val studyIdByParticipantEKID = participantsByStudyId | ||
.map { (studyId, participants) -> participants.associateWith { studyId } } | ||
.flatMap { | ||
it.asSequence() | ||
}.associate { it.key to it.value } | ||
|
||
val surveysDataByParticipant = getSurveysDataByParticipant( | ||
adminPrincipals, participantsByStudyId.values.flatten().toSet(), participantEntitySetIds, usedByEntitySetId, userAppsEntitySetId) | ||
|
||
logger.info("found {} survey entities in org {}", surveysDataByParticipant.values.size, orgId) | ||
|
||
allStudyIdsByParticipantEKID += studyIdByParticipantEKID | ||
allParticipantIdsByEKID += participantIdByEKID | ||
allSurveyDataByParticipantEKID += surveysDataByParticipant | ||
} | ||
} | ||
|
||
private fun filterInvalidStudies(entities: Map<UUID, Map<FullQualifiedName, Set<Any>>>): Map<UUID, Map<FullQualifiedName, Set<Any>>> { | ||
return entities.filterValues { getFirstUUIDOrNull(it, STRING_ID_FQN) != null } | ||
} | ||
|
||
// returns a map of collection template name to entity set id | ||
private fun getOrgEntitySetIds(organizationId: UUID): Map<String, UUID> { | ||
val entitySetNameByTemplateName = when (organizationId) { | ||
LEGACY_ORG_ID -> mapOf( | ||
USER_APPS to LEGACY_USER_APPS_ES, | ||
USED_BY to LEGACY_USED_BY_ES, | ||
STUDIES to LEGACY_STUDIES_ES, | ||
PARTICIPATED_IN to LEGACY_PARTICIPATED_IN_ES | ||
) | ||
else -> { | ||
val orgIdToStr = organizationId.toString().replace("-", "") | ||
mapOf( | ||
USER_APPS to "$DATA_COLLECTION_APP_PREFIX${orgIdToStr}_$USER_APPS", | ||
USED_BY to "$DATA_COLLECTION_APP_PREFIX${orgIdToStr}_$USED_BY", | ||
STUDIES to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$STUDIES", | ||
PARTICIPATED_IN to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_$PARTICIPATED_IN", | ||
PARTICIPANTS to "$CHRONICLE_APP_ES_PREFIX${orgIdToStr}_${PARTICIPANTS}" | ||
) | ||
} | ||
} | ||
|
||
return entitySetNameByTemplateName.mapValues { entitySetIds.getValue(it.value).id } | ||
} | ||
|
||
private fun getLegacyParticipantEntitySetIds(studyIds: Set<UUID>): Set<UUID> { | ||
val entitySetNames = studyIds.map { "chronicle_participants_$it" } | ||
return entitySetNames.map { entitySetIds.getValue(it).id }.toSet() | ||
} | ||
|
||
private fun getStudyParticipants( | ||
studiesEntitySetId: UUID, | ||
studyEntityKeyIds: Set<UUID>, | ||
participantEntitySetIds: Set<UUID>, | ||
participatedInEntitySetId: UUID, | ||
principals: Set<Principal> | ||
|
||
): Map<UUID, MutableList<NeighborEntityDetails>> { | ||
val filter = EntityNeighborsFilter( | ||
studyEntityKeyIds, Optional.of(participantEntitySetIds), Optional.of(setOf(studiesEntitySetId)), Optional.of(setOf(participatedInEntitySetId))) | ||
return searchService.executeEntityNeighborSearch(setOf(studiesEntitySetId), PagedNeighborRequest(filter), principals).neighbors | ||
} | ||
|
||
private fun getSurveysDataByParticipant( | ||
principals: Set<Principal>, | ||
entityKeyIds: Set<UUID>, | ||
participantEntitySetIds: Set<UUID>, | ||
usedByEnEntitySetId: UUID, | ||
userAppsEntitySetId: UUID): Map<UUID, List<Map<FullQualifiedName, Set<Any?>>>> { | ||
|
||
val filter = EntityNeighborsFilter(entityKeyIds, Optional.of(setOf(userAppsEntitySetId)), Optional.of(participantEntitySetIds), Optional.of(setOf(usedByEnEntitySetId))) | ||
|
||
val neighbors = searchService | ||
.executeEntityNeighborSearch(participantEntitySetIds, PagedNeighborRequest(filter), principals).neighbors | ||
.mapValues { (_, v) -> | ||
v.associateBy { getFirstUUIDOrNull(it.associationDetails, OL_ID_FQN)!! } | ||
} | ||
|
||
val associationEntityKeyIds = neighbors.values.flatMap { it.keys }.toSet() | ||
val usedByEntities = getEntitiesByEntityKeyId(usedByEnEntitySetId, associationEntityKeyIds, setOf(MetadataOption.LAST_WRITE)) | ||
|
||
return neighbors | ||
.mapValues { (_, v) -> | ||
v.map { | ||
it.value.neighborDetails.get().toMutableMap() + usedByEntities.getOrDefault(it.key, mapOf()) | ||
} | ||
} | ||
} | ||
|
||
private fun getFirstUUIDOrNull(entity: Map<FullQualifiedName, Set<Any?>>, fqn: FullQualifiedName): UUID? { | ||
return when (val string = getFirstValueOrNull(entity, fqn)) { | ||
null -> null | ||
else -> UUID.fromString(string) | ||
} | ||
} | ||
|
||
private fun getFirstValueOrNull(entity: Map<FullQualifiedName, Set<Any?>>, fqn: FullQualifiedName): String? { | ||
entity[fqn]?.iterator()?.let { | ||
if (it.hasNext()) return it.next().toString() | ||
} | ||
return null | ||
} | ||
|
||
private fun getEntitiesByEntityKeyId( | ||
entitySetId: UUID, | ||
entityKeyIds: Set<UUID> = setOf(), | ||
metadataOptions: Set<MetadataOption> = setOf() | ||
): Map<UUID, Map<FullQualifiedName, Set<Any>>> { | ||
return dataQueryService.getEntitiesWithPropertyTypeFqns( | ||
mapOf(entitySetId to Optional.of(entityKeyIds)), | ||
entitySetService.getPropertyTypesOfEntitySets(setOf(entitySetId)), | ||
mapOf(), | ||
metadataOptions, | ||
Optional.empty(), | ||
false, | ||
) | ||
} | ||
|
||
override fun getSupportedVersion(): Long { | ||
return Version.V2021_07_23.value | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might need the super user here, this is one of the issues we faced while doing study + participant migration.