Skip to content

Commit

Permalink
refact(ingestion): make indices on the fly
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Nov 6, 2024
1 parent 0d66931 commit b433d15
Showing 1 changed file with 10 additions and 48 deletions.
58 changes: 10 additions & 48 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"os"
"os/exec"
"path"
"regexp"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -372,9 +370,8 @@ func (i *IngestionService) ProcessVcf(

scanner := bufio.NewScanner(gr)

var contigs []string // To collect contigs as defined in VCF header
contigs := make(map[string]struct{}) // To collect contigs as defined in VCF header
var contigMutex = sync.RWMutex{}
var preMadeContigIndices bool = false

var discoveredHeaders bool = false
var headers []string
Expand All @@ -388,30 +385,12 @@ func (i *IngestionService) ProcessVcf(
// - manage # of lines being concurrently processed per file at any given time
lineProcessingQueue := make(chan bool, lineProcessingConcurrencyLevel)

// pattern for contig headers
// - sectioning off the chr prefix strips it from the contig name prior to ingestion, more or less preserving
// previous Gohan behaviour (which did a find and replace.)
var contig_pattern = regexp.MustCompile(`##contig=<ID=(chr)?([a-zA-Z0-9_\-.]+)(,.*)?`)

for scanner.Scan() {
// Gather Header row by seeking the CHROM string
// Collect contigs (chromosomes) to create indices
line := scanner.Text()
if !discoveredHeaders {
if line[0:8] == "##contig" {
var matches = contig_pattern.FindStringSubmatch(line)

if len(matches) == 0 || matches[2] == "" {
// Invalid contig name - error
fmt.Printf("Error: got invalid contig header '%s' (matches: %v)\n", line, matches)
} else {
// Valid
// Currently this isn't parallelized, but we use the mutex just in case
contigMutex.Lock()
contigs = append(contigs, matches[2])
contigMutex.Unlock()
}
} else if line[0:6] == "#CHROM" {
if line[0:6] == "#CHROM" {
// Split the string by tabs
headers = strings.Split(line, "\t")

Expand All @@ -424,21 +403,6 @@ func (i *IngestionService) ProcessVcf(
}
}

// If we got to the VCF final header line, we've found all the contigs possible
// --> create required indices (one per contig) with mappings to ensure ES types are consistent and
// mitigate issues we've encountered with e.g., SIGNATURE, where a date field was detected for
// info.value.
contigMutex.Lock()
fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs)
for _, c := range contigs {
i.MakeVariantIndex(c)
}
contigMutex.Unlock()
if len(contigs) > 0 {
// flag to prevent trying to make contig indices on-the-fly during ingestion
preMadeContigIndices = true
}

discoveredHeaders = true
fmt.Printf("Found %d headers: %v\n", len(headers), headers)
}
Expand Down Expand Up @@ -486,17 +450,15 @@ func (i *IngestionService) ProcessVcf(
// Strip out chr prefix
value = strings.ReplaceAll(value, "chr", "")

if !preMadeContigIndices {
// If we have to make contig indices on the fly, check if we haven't created the contig yet.
// If we haven't, create the index and add it to the contigs slice.
// // A bit janky - O(n) lookup every time for whether contig exists
contigMutex.Lock()
if !slices.Contains(contigs, value) {
i.MakeVariantIndex(value)
contigs = append(contigs, value)
}
contigMutex.Unlock()
// We're making contig indices on the fly - check if we haven't created the contig yet.
// If we haven't, create the index and add it to the contigs "set" (map).
contigMutex.Lock()
_, indexExists := contigs[value]
if !indexExists {
i.MakeVariantIndex(value)
contigs[value] = struct{}{} // add contig to the "set" of created configs
}
contigMutex.Unlock()

// ems if value is valid chromosome
if chromosome.IsValidHumanChromosome(value) {
Expand Down

0 comments on commit b433d15

Please sign in to comment.