From 4ee4b80e8c4f9658cde067b8f12edbf915df05a7 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Mon, 4 Nov 2024 15:31:52 -0500 Subject: [PATCH 01/10] fix: error when ingesting after index already exists --- src/api/services/ingestion.go | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 06f8a60..24d0cca 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -413,14 +413,22 @@ func (i *IngestionService) ProcessVcf( fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs) for _, c := range contigs { var client = i.ElasticsearchClient + var contigIndex = variantIndexName(c) + + res, err := client.Indices.Exists([]string{contigIndex}) + if res.StatusCode == 404 { + mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) + res, _ := client.Indices.Create( + contigIndex, + client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), + ) - mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) - res, _ := client.Indices.Create( - variantIndexName(c), - client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), - ) - - fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) + fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) + } else if err != nil { + fmt.Printf("Contig index %s existence-check got error: %s\n", c, err) + } else { + fmt.Printf("Contig index %s already exists; skipping creation\n", c) + } } discoveredHeaders = true From f6c3708ce461c70cd33ddf10c936d2566c5ccdc3 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Mon, 4 Nov 2024 15:32:50 -0500 Subject: [PATCH 02/10] chore: comment --- src/api/services/ingestion.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 24d0cca..a773b10 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -425,8 +425,10 @@ func (i *IngestionService) ProcessVcf( fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) } else if err != nil { + // The actual check didn't work properly (e.g., couldn't contact ES). fmt.Printf("Contig index %s existence-check got error: %s\n", c, err) } else { + // The check worked and the index already exists, so we shouldn't try to recreate it. fmt.Printf("Contig index %s already exists; skipping creation\n", c) } } From 7c8e7ee59c35312c0ecd2608b5a1c8d3a513bfbd Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Mon, 4 Nov 2024 17:06:02 -0500 Subject: [PATCH 03/10] fix: contig header line parsing --- src/api/services/ingestion.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index a773b10..b6b0134 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -23,6 +23,7 @@ import ( "os" "os/exec" "path" + "regexp" "strconv" "strings" "sync" @@ -382,16 +383,24 @@ func (i *IngestionService) ProcessVcf( // - manage # of lines being concurrently processed per file at any given time lineProcessingQueue := make(chan bool, lineProcessingConcurrencyLevel) + // pattern for contig headers + var contig_pattern = regexp.MustCompile(`##contig=", "", 1), "chr", "", 1)), - ) + var matches = contig_pattern.FindStringSubmatch(line) + + if len(matches) == 0 || matches[2] == "" { + // Invalid contig name - error + fmt.Printf("Error: got invalid contig header '%s' (matches: %v)\n", line, matches) + } else { + // Valid + contigs = append(contigs, matches[2]) + } } if line[0:6] == "#CHROM" { // Split the string by tabs From 1a88b8019bd555b9490d1d9b6033d3ad42834dfa Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Mon, 4 Nov 2024 17:07:22 -0500 Subject: [PATCH 04/10] chore: comment pattern --- src/api/services/ingestion.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index b6b0134..9c040c4 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -384,6 +384,8 @@ func (i *IngestionService) ProcessVcf( lineProcessingQueue := make(chan bool, lineProcessingConcurrencyLevel) // pattern for contig headers + // - sectioning off the chr prefix strips it from the contig name prior to ingestion, more or less preserving + // previous Gohan behaviour (which did a find and replace.) var contig_pattern = regexp.MustCompile(`##contig= Date: Tue, 5 Nov 2024 11:00:40 -0500 Subject: [PATCH 05/10] chore: update version + base image --- etc/example.env | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/etc/example.env b/etc/example.env index a8d4c8d..b3f3f0c 100644 --- a/etc/example.env +++ b/etc/example.env @@ -2,7 +2,7 @@ GOHAN_DEBUG=false GOHAN_SERVICE_CONTACT=someone@somewhere.ca -GOHAN_SEMVER=5.0.2 +GOHAN_SEMVER=6.0.0 GOHAN_SERVICES="gateway api elasticsearch kibana drs authorization" # GOOS=linux @@ -39,8 +39,8 @@ GOHAN_API_IMAGE=gohan-api GOHAN_API_VERSION=latest GOHAN_API_BUILDER_BASE_IMAGE=golang:1.21-bookworm -GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.09.01 -GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.09.01 +GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.11.01 +GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.11.01 GOHAN_API_CONTAINER_NAME=gohan-api GOHAN_API_SERVICE_HOST=0.0.0.0 From ad350dec6c3a91af9b77c3f86f2381834bf68b88 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Tue, 5 Nov 2024 15:50:20 -0500 Subject: [PATCH 06/10] fix: handle ##contig-less VCFs + log header count --- src/api/services/ingestion.go | 77 ++++++++++++++++++++++++----------- 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 9c040c4..c00fff2 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -24,6 +24,7 @@ import ( "os/exec" "path" "regexp" + "slices" "strconv" "strings" "sync" @@ -370,7 +371,11 @@ func (i *IngestionService) ProcessVcf( defer gr.Close() scanner := bufio.NewScanner(gr) + var contigs []string // To collect contigs as defined in VCF header + var contigMutex = sync.RWMutex{} + var preMadeContigIndices bool = false + var discoveredHeaders bool = false var headers []string headerSampleIds := make(map[int]string) @@ -401,10 +406,12 @@ func (i *IngestionService) ProcessVcf( fmt.Printf("Error: got invalid contig header '%s' (matches: %v)\n", line, matches) } else { // Valid + // Currently this isn't parallelized, but we use the mutex just in case + contigMutex.Lock() contigs = append(contigs, matches[2]) + contigMutex.Unlock() } - } - if line[0:6] == "#CHROM" { + } else if line[0:6] == "#CHROM" { // Split the string by tabs headers = strings.Split(line, "\t") @@ -421,31 +428,19 @@ func (i *IngestionService) ProcessVcf( // --> create required indices (one per contig) with mappings to ensure ES types are consistent and // mitigate issues we've encountered with e.g., SIGNATURE, where a date field was detected for // info.value. + contigMutex.Lock() fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs) for _, c := range contigs { - var client = i.ElasticsearchClient - var contigIndex = variantIndexName(c) - - res, err := client.Indices.Exists([]string{contigIndex}) - if res.StatusCode == 404 { - mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) - res, _ := client.Indices.Create( - contigIndex, - client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), - ) - - fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) - } else if err != nil { - // The actual check didn't work properly (e.g., couldn't contact ES). - fmt.Printf("Contig index %s existence-check got error: %s\n", c, err) - } else { - // The check worked and the index already exists, so we shouldn't try to recreate it. - fmt.Printf("Contig index %s already exists; skipping creation\n", c) - } + i.MakeVariantIndex(c) + } + contigMutex.Unlock() + if len(contigs) > 0 { + // flag to prevent trying to make contig indices on-the-fly during ingestion + preMadeContigIndices = true } discoveredHeaders = true - fmt.Println("Found the headers: ", headers) + fmt.Printf("Found %d headers: %v\n", len(headers), headers) } continue } @@ -478,9 +473,9 @@ func (i *IngestionService) ProcessVcf( rowWg.Add(len(rowComponents)) for rowIndex, rowComponent := range rowComponents { - go func(i int, rc string, rwg *sync.WaitGroup) { + go func(h int, rc string, rwg *sync.WaitGroup) { defer rwg.Done() - key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[i], "#", "", -1))) + key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1))) value := strings.TrimSpace(rc) // if not a vcf header, assume it's a sampleId header @@ -491,6 +486,18 @@ func (i *IngestionService) ProcessVcf( // Strip out all non-numeric characters value = strings.ReplaceAll(value, "chr", "") + if !preMadeContigIndices { + // If we have to make contig indices on the fly, check if we haven't created the contig yet. + // If we haven't, create the index and add it to the contigs slice. + // // A bit janky - O(n) lookup every time for whether contig exists + contigMutex.Lock() + if !slices.Contains(contigs, value) { + i.MakeVariantIndex(value) + contigs = append(contigs, value) + } + contigMutex.Unlock() + } + // ems if value is valid chromosome if chromosome.IsValidHumanChromosome(value) { tmpVariantMapMutex.Lock() @@ -908,6 +915,28 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool { return false } +func (i *IngestionService) MakeVariantIndex(c string) { + var client = i.ElasticsearchClient + var contigIndex = variantIndexName(c) + + res, err := client.Indices.Exists([]string{contigIndex}) + if res.StatusCode == 404 { + mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING) + res, _ := client.Indices.Create( + contigIndex, + client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))), + ) + + fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String()) + } else if err != nil { + // The actual check didn't work properly (e.g., couldn't contact ES). + fmt.Printf("Contig index %s existence-check got error: %s\n", c, err) + } else { + // The check worked and the index already exists, so we shouldn't try to recreate it. + fmt.Printf("Contig index %s already exists; skipping creation\n", c) + } +} + func variantIndexName(contig string) string { return fmt.Sprintf("variants-%s", strings.ToLower(contig)) } From 57f503db9acc5ccbda16970de8f422d1eef9fcfe Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Tue, 5 Nov 2024 15:58:49 -0500 Subject: [PATCH 07/10] chore: rename confusing ingestion variables --- src/api/services/ingestion.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index c00fff2..0547534 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -472,7 +472,7 @@ func (i *IngestionService) ProcessVcf( var rowWg sync.WaitGroup rowWg.Add(len(rowComponents)) - for rowIndex, rowComponent := range rowComponents { + for colIdx, colVal := range rowComponents { go func(h int, rc string, rwg *sync.WaitGroup) { defer rwg.Done() key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1))) @@ -603,7 +603,7 @@ func (i *IngestionService) ProcessVcf( }) tmpSamplesMutex.Unlock() } - }(rowIndex, rowComponent, &rowWg) + }(colIdx, colVal, &rowWg) } rowWg.Wait() From 4e5297ecf1e1b66d3f27ebdf064fbc6a4e5c3638 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Tue, 5 Nov 2024 16:02:24 -0500 Subject: [PATCH 08/10] chore: fix confusing comment --- src/api/services/ingestion.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 0547534..14ecf18 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -483,7 +483,7 @@ func (i *IngestionService) ProcessVcf( // filter field type by column name if key == "chrom" { - // Strip out all non-numeric characters + // Strip out chr prefix value = strings.ReplaceAll(value, "chr", "") if !preMadeContigIndices { From 0d6693137873164b6634a6358999cc2d138aea9c Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Tue, 5 Nov 2024 16:08:32 -0500 Subject: [PATCH 09/10] chore: rename idx iter var --- src/api/services/ingestion.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 14ecf18..90577e7 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -415,12 +415,12 @@ func (i *IngestionService) ProcessVcf( // Split the string by tabs headers = strings.Split(line, "\t") - for id, header := range headers { + for idx, header := range headers { // determine if header is a default VCF header. // if it is not, assume it's a sampleId and keep // track of it with an id if !utils.StringInSlice(strings.ToLower(strings.TrimSpace(strings.ReplaceAll(header, "#", ""))), constants.VcfHeaders) { - headerSampleIds[len(constants.VcfHeaders)-id] = header + headerSampleIds[len(constants.VcfHeaders)-idx] = header } } From b433d15968eb8a9f35cac9103de1af49a3feaad6 Mon Sep 17 00:00:00 2001 From: David Lougheed Date: Wed, 6 Nov 2024 10:12:13 -0500 Subject: [PATCH 10/10] refact(ingestion): make indices on the fly --- src/api/services/ingestion.go | 58 ++++++----------------------------- 1 file changed, 10 insertions(+), 48 deletions(-) diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 90577e7..fbd4c3f 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -23,8 +23,6 @@ import ( "os" "os/exec" "path" - "regexp" - "slices" "strconv" "strings" "sync" @@ -372,9 +370,8 @@ func (i *IngestionService) ProcessVcf( scanner := bufio.NewScanner(gr) - var contigs []string // To collect contigs as defined in VCF header + contigs := make(map[string]struct{}) // To collect contigs as defined in VCF header var contigMutex = sync.RWMutex{} - var preMadeContigIndices bool = false var discoveredHeaders bool = false var headers []string @@ -388,30 +385,12 @@ func (i *IngestionService) ProcessVcf( // - manage # of lines being concurrently processed per file at any given time lineProcessingQueue := make(chan bool, lineProcessingConcurrencyLevel) - // pattern for contig headers - // - sectioning off the chr prefix strips it from the contig name prior to ingestion, more or less preserving - // previous Gohan behaviour (which did a find and replace.) - var contig_pattern = regexp.MustCompile(`##contig= create required indices (one per contig) with mappings to ensure ES types are consistent and - // mitigate issues we've encountered with e.g., SIGNATURE, where a date field was detected for - // info.value. - contigMutex.Lock() - fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs) - for _, c := range contigs { - i.MakeVariantIndex(c) - } - contigMutex.Unlock() - if len(contigs) > 0 { - // flag to prevent trying to make contig indices on-the-fly during ingestion - preMadeContigIndices = true - } - discoveredHeaders = true fmt.Printf("Found %d headers: %v\n", len(headers), headers) } @@ -486,17 +450,15 @@ func (i *IngestionService) ProcessVcf( // Strip out chr prefix value = strings.ReplaceAll(value, "chr", "") - if !preMadeContigIndices { - // If we have to make contig indices on the fly, check if we haven't created the contig yet. - // If we haven't, create the index and add it to the contigs slice. - // // A bit janky - O(n) lookup every time for whether contig exists - contigMutex.Lock() - if !slices.Contains(contigs, value) { - i.MakeVariantIndex(value) - contigs = append(contigs, value) - } - contigMutex.Unlock() + // We're making contig indices on the fly - check if we haven't created the contig yet. + // If we haven't, create the index and add it to the contigs "set" (map). + contigMutex.Lock() + _, indexExists := contigs[value] + if !indexExists { + i.MakeVariantIndex(value) + contigs[value] = struct{}{} // add contig to the "set" of created configs } + contigMutex.Unlock() // ems if value is valid chromosome if chromosome.IsValidHumanChromosome(value) {