From ad350dec6c3a91af9b77c3f86f2381834bf68b88 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Tue, 5 Nov 2024 15:50:20 -0500 Subject: [PATCH] fix: handle ##contig-less VCFs + log header count --- src/api/services/ingestion.go | 77 ++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 9c040c4..c00fff2 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -24,6 +24,7 @@ import ( "os/exec" "path" "regexp" + "slices" "strconv" "strings" "sync" @@ -370,7 +371,11 @@ func (i *IngestionService) ProcessVcf( defer gr.Close() scanner := bufio.NewScanner(gr) + var contigs []string // To collect contigs as defined in VCF header + var contigMutex = sync.RWMutex{} + var preMadeContigIndices bool = false + var discoveredHeaders bool = false var headers []string headerSampleIds := make(map[int]string) @@ -401,10 +406,12 @@ func (i *IngestionService) ProcessVcf( fmt.Printf("Error: got invalid contig header '%s' (matches: %v)\n", line, matches) } else { // Valid + // Currently this isn't parallelized, but we use the mutex just in case + contigMutex.Lock() contigs = append(contigs, matches[2]) + contigMutex.Unlock() } - } - if line[0:6] == "#CHROM" { + } else if line[0:6] == "#CHROM" { // Split the string by tabs headers = strings.Split(line, "\t") @@ -421,31 +428,19 @@ func (i *IngestionService) ProcessVcf( // --> create required indices (one per contig) with mappings to ensure ES types are consistent and // mitigate issues we've encountered with e.g., SIGNATURE, where a date field was detected for // info.value. + contigMutex.Lock() fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs) for _, c := range contigs { - var client = i.ElasticsearchClient - var contigIndex = variantIndexName(c) - - res, err := client.Indices.Exists([]string{contigIndex}) - if res.StatusCode == 404 { - mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) - res, _ := client.Indices.Create( - contigIndex, - client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), - ) - - fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) - } else if err != nil { - // The actual check didn't work properly (e.g., couldn't contact ES). - fmt.Printf("Contig index %s existence-check got error: %s\n", c, err) - } else { - // The check worked and the index already exists, so we shouldn't try to recreate it. - fmt.Printf("Contig index %s already exists; skipping creation\n", c) - } + i.MakeVariantIndex(c) + } + contigMutex.Unlock() + if len(contigs) > 0 { + // flag to prevent trying to make contig indices on-the-fly during ingestion + preMadeContigIndices = true } discoveredHeaders = true - fmt.Println("Found the headers: ", headers) + fmt.Printf("Found %d headers: %v\n", len(headers), headers) } continue } @@ -478,9 +473,9 @@ func (i *IngestionService) ProcessVcf( rowWg.Add(len(rowComponents)) for rowIndex, rowComponent := range rowComponents { - go func(i int, rc string, rwg *sync.WaitGroup) { + go func(h int, rc string, rwg *sync.WaitGroup) { defer rwg.Done() - key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[i], "#", "", -1))) + key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1))) value := strings.TrimSpace(rc) // if not a vcf header, assume it's a sampleId header @@ -491,6 +486,18 @@ func (i *IngestionService) ProcessVcf( // Strip out all non-numeric characters value = strings.ReplaceAll(value, "chr", "") + if !preMadeContigIndices { + // If we have to make contig indices on the fly, check if we haven't created the contig yet. + // If we haven't, create the index and add it to the contigs slice. + // // A bit janky - O(n) lookup every time for whether contig exists + contigMutex.Lock() + if !slices.Contains(contigs, value) { + i.MakeVariantIndex(value) + contigs = append(contigs, value) + } + contigMutex.Unlock() + } + // ems if value is valid chromosome if chromosome.IsValidHumanChromosome(value) { tmpVariantMapMutex.Lock() @@ -908,6 +915,28 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool { return false } +func (i *IngestionService) MakeVariantIndex(c string) { + var client = i.ElasticsearchClient + var contigIndex = variantIndexName(c) + + res, err := client.Indices.Exists([]string{contigIndex}) + if res.StatusCode == 404 { + mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) + res, _ := client.Indices.Create( + contigIndex, + client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), + ) + + fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) + } else if err != nil { + // The actual check didn't work properly (e.g., couldn't contact ES). + fmt.Printf("Contig index %s existence-check got error: %s\n", c, err) + } else { + // The check worked and the index already exists, so we shouldn't try to recreate it. + fmt.Printf("Contig index %s already exists; skipping creation\n", c) + } +} + func variantIndexName(contig string) string { return fmt.Sprintf("variants-%s", strings.ToLower(contig)) }