diff --git a/etc/example.env b/etc/example.env index a8d4c8d..b3f3f0c 100644 --- a/etc/example.env +++ b/etc/example.env @@ -2,7 +2,7 @@ GOHAN_DEBUG=false GOHAN_SERVICE_CONTACT=someone@somewhere.ca -GOHAN_SEMVER=5.0.2 +GOHAN_SEMVER=6.0.0 GOHAN_SERVICES="gateway api elasticsearch kibana drs authorization" # GOOS=linux @@ -39,8 +39,8 @@ GOHAN_API_IMAGE=gohan-api GOHAN_API_VERSION=latest GOHAN_API_BUILDER_BASE_IMAGE=golang:1.21-bookworm -GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.09.01 -GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.09.01 +GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.11.01 +GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.11.01 GOHAN_API_CONTAINER_NAME=gohan-api GOHAN_API_SERVICE_HOST=0.0.0.0 diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 0010f3f..21b707d 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -368,7 +368,10 @@ func (i *IngestionService) ProcessVcf( defer gr.Close() scanner := bufio.NewScanner(gr) - var contigs []string // To collect contigs as defined in VCF header + + contigs := make(map[string]struct{}) // To collect contigs as defined in VCF header + var contigMutex = sync.RWMutex{} + var discoveredHeaders bool = false var headers []string headerSampleIds := make(map[int]string) @@ -386,44 +389,21 @@ func (i *IngestionService) ProcessVcf( // Collect contigs (chromosomes) to create indices line := scanner.Text() if !discoveredHeaders { - if line[0:8] == "##contig" { - contigs = append( - contigs, - strings.TrimSpace(strings.Replace(strings.Replace(strings.Replace(line, "##contig=", "", 1), "chr", "", 1)), - ) - } if line[0:6] == "#CHROM" { // Split the string by tabs headers = strings.Split(line, "\t") - for id, header := range headers { + for idx, header := range headers { // determine if header is a default VCF header. // if it is not, assume it's a sampleId and keep // track of it with an id if !utils.StringInSlice(strings.ToLower(strings.TrimSpace(strings.ReplaceAll(header, "#", ""))), constants.VcfHeaders) { - headerSampleIds[len(constants.VcfHeaders)-id] = header + headerSampleIds[len(constants.VcfHeaders)-idx] = header } } - // If we got to the VCF final header line, we've found all the contigs possible - // --> create required indices (one per contig) with mappings to ensure ES types are consistent and - // mitigate issues we've encountered with e.g., SIGNATURE, where a date field was detected for - // info.value. - fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs) - for _, c := range contigs { - var client = i.ElasticsearchClient - - mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) - res, _ := client.Indices.Create( - variantIndexName(c), - client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), - ) - - fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) - } - discoveredHeaders = true - fmt.Println("Found the headers: ", headers) + fmt.Printf("Found %d headers: %v\n", len(headers), headers) } continue } @@ -455,10 +435,10 @@ func (i *IngestionService) ProcessVcf( var rowWg sync.WaitGroup rowWg.Add(len(rowComponents)) - for rowIndex, rowComponent := range rowComponents { - go func(i int, rc string, rwg *sync.WaitGroup) { + for colIdx, colVal := range rowComponents { + go func(h int, rc string, rwg *sync.WaitGroup) { defer rwg.Done() - key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[i], "#", "", -1))) + key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1))) value := strings.TrimSpace(rc) // if not a vcf header, assume it's a sampleId header @@ -469,6 +449,16 @@ func (i *IngestionService) ProcessVcf( // Strip out chr prefix for some normalization with human/model-organism contigs value = strings.ReplaceAll(value, "chr", "") + // We're making contig indices on the fly - check if we haven't created the contig yet. + // If we haven't, create the index and add it to the contigs "set" (map). + contigMutex.Lock() + _, indexExists := contigs[value] + if !indexExists { + i.MakeVariantIndex(value) + contigs[value] = struct{}{} // add contig to the "set" of created configs + } + contigMutex.Unlock() + tmpVariantMapMutex.Lock() tmpVariant[key] = value tmpVariantMapMutex.Unlock() @@ -563,7 +553,7 @@ func (i *IngestionService) ProcessVcf( }) tmpSamplesMutex.Unlock() } - }(rowIndex, rowComponent, &rowWg) + }(colIdx, colVal, &rowWg) } rowWg.Wait() @@ -875,6 +865,28 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool { return false } +func (i *IngestionService) MakeVariantIndex(c string) { + var client = i.ElasticsearchClient + var contigIndex = variantIndexName(c) + + res, err := client.Indices.Exists([]string{contigIndex}) + if res.StatusCode == 404 { + mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) + res, _ := client.Indices.Create( + contigIndex, + client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), + ) + + fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) + } else if err != nil { + // The actual check didn't work properly (e.g., couldn't contact ES). + fmt.Printf("Contig index %s existence-check got error: %s\n", c, err) + } else { + // The check worked and the index already exists, so we shouldn't try to recreate it. + fmt.Printf("Contig index %s already exists; skipping creation\n", c) + } +} + func variantIndexName(contig string) string { return fmt.Sprintf("variants-%s", strings.ToLower(contig)) }