Skip to content

Commit

Permalink
fix: handle ##contig-less VCFs + log header count
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Nov 5, 2024
1 parent b002215 commit ad350de
Showing 1 changed file with 53 additions and 24 deletions.
77 changes: 53 additions & 24 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"os/exec"
"path"
"regexp"
"slices"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -370,7 +371,11 @@ func (i *IngestionService) ProcessVcf(
defer gr.Close()

scanner := bufio.NewScanner(gr)

var contigs []string // To collect contigs as defined in VCF header
var contigMutex = sync.RWMutex{}
var preMadeContigIndices bool = false

var discoveredHeaders bool = false
var headers []string
headerSampleIds := make(map[int]string)
Expand Down Expand Up @@ -401,10 +406,12 @@ func (i *IngestionService) ProcessVcf(
fmt.Printf("Error: got invalid contig header '%s' (matches: %v)\n", line, matches)
} else {
// Valid
// Currently this isn't parallelized, but we use the mutex just in case
contigMutex.Lock()
contigs = append(contigs, matches[2])
contigMutex.Unlock()
}
}
if line[0:6] == "#CHROM" {
} else if line[0:6] == "#CHROM" {
// Split the string by tabs
headers = strings.Split(line, "\t")

Expand All @@ -421,31 +428,19 @@ func (i *IngestionService) ProcessVcf(
// --> create required indices (one per contig) with mappings to ensure ES types are consistent and
// mitigate issues we've encountered with e.g., SIGNATURE, where a date field was detected for
// info.value.
contigMutex.Lock()
fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs)
for _, c := range contigs {
var client = i.ElasticsearchClient
var contigIndex = variantIndexName(c)

res, err := client.Indices.Exists([]string{contigIndex})
if res.StatusCode == 404 {
mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING)
res, _ := client.Indices.Create(
contigIndex,
client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))),
)

fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String())
} else if err != nil {
// The actual check didn't work properly (e.g., couldn't contact ES).
fmt.Printf("Contig index %s existence-check got error: %s\n", c, err)
} else {
// The check worked and the index already exists, so we shouldn't try to recreate it.
fmt.Printf("Contig index %s already exists; skipping creation\n", c)
}
i.MakeVariantIndex(c)
}
contigMutex.Unlock()
if len(contigs) > 0 {
// flag to prevent trying to make contig indices on-the-fly during ingestion
preMadeContigIndices = true
}

discoveredHeaders = true
fmt.Println("Found the headers: ", headers)
fmt.Printf("Found %d headers: %v\n", len(headers), headers)
}
continue
}
Expand Down Expand Up @@ -478,9 +473,9 @@ func (i *IngestionService) ProcessVcf(
rowWg.Add(len(rowComponents))

for rowIndex, rowComponent := range rowComponents {
go func(i int, rc string, rwg *sync.WaitGroup) {
go func(h int, rc string, rwg *sync.WaitGroup) {
defer rwg.Done()
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[i], "#", "", -1)))
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1)))
value := strings.TrimSpace(rc)

// if not a vcf header, assume it's a sampleId header
Expand All @@ -491,6 +486,18 @@ func (i *IngestionService) ProcessVcf(
// Strip out all non-numeric characters
value = strings.ReplaceAll(value, "chr", "")

if !preMadeContigIndices {
// If we have to make contig indices on the fly, check if we haven't created the contig yet.
// If we haven't, create the index and add it to the contigs slice.
// // A bit janky - O(n) lookup every time for whether contig exists
contigMutex.Lock()
if !slices.Contains(contigs, value) {
i.MakeVariantIndex(value)
contigs = append(contigs, value)
}
contigMutex.Unlock()
}

// ems if value is valid chromosome
if chromosome.IsValidHumanChromosome(value) {
tmpVariantMapMutex.Lock()
Expand Down Expand Up @@ -908,6 +915,28 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool {
return false
}

func (i *IngestionService) MakeVariantIndex(c string) {
var client = i.ElasticsearchClient
var contigIndex = variantIndexName(c)

res, err := client.Indices.Exists([]string{contigIndex})
if res.StatusCode == 404 {
mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING)
res, _ := client.Indices.Create(
contigIndex,
client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))),
)

fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String())
} else if err != nil {
// The actual check didn't work properly (e.g., couldn't contact ES).
fmt.Printf("Contig index %s existence-check got error: %s\n", c, err)
} else {
// The check worked and the index already exists, so we shouldn't try to recreate it.
fmt.Printf("Contig index %s already exists; skipping creation\n", c)
}
}

func variantIndexName(contig string) string {
return fmt.Sprintf("variants-%s", strings.ToLower(contig))
}

0 comments on commit ad350de

Please sign in to comment.