Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/refact/fixed-mappings' into chor…
Browse files Browse the repository at this point in the history
…e/support-all-chroms

# Conflicts:
#	src/api/services/ingestion.go
  • Loading branch information
davidlougheed committed Nov 6, 2024
2 parents b17d8e1 + b433d15 commit c5642a8
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 34 deletions.
6 changes: 3 additions & 3 deletions etc/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

GOHAN_DEBUG=false
GOHAN_SERVICE_CONTACT=[email protected]
GOHAN_SEMVER=5.0.2
GOHAN_SEMVER=6.0.0
GOHAN_SERVICES="gateway api elasticsearch kibana drs authorization"

# GOOS=linux
Expand Down Expand Up @@ -39,8 +39,8 @@ GOHAN_API_IMAGE=gohan-api
GOHAN_API_VERSION=latest

GOHAN_API_BUILDER_BASE_IMAGE=golang:1.21-bookworm
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.09.01
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.09.01
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.11.01
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.11.01

GOHAN_API_CONTAINER_NAME=gohan-api
GOHAN_API_SERVICE_HOST=0.0.0.0
Expand Down
74 changes: 43 additions & 31 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,10 @@ func (i *IngestionService) ProcessVcf(
defer gr.Close()

scanner := bufio.NewScanner(gr)
var contigs []string // To collect contigs as defined in VCF header

contigs := make(map[string]struct{}) // To collect contigs as defined in VCF header
var contigMutex = sync.RWMutex{}

var discoveredHeaders bool = false
var headers []string
headerSampleIds := make(map[int]string)
Expand All @@ -386,44 +389,21 @@ func (i *IngestionService) ProcessVcf(
// Collect contigs (chromosomes) to create indices
line := scanner.Text()
if !discoveredHeaders {
if line[0:8] == "##contig" {
contigs = append(
contigs,
strings.TrimSpace(strings.Replace(strings.Replace(strings.Replace(line, "##contig=<ID=", "", 1), ">", "", 1), "chr", "", 1)),
)
}
if line[0:6] == "#CHROM" {
// Split the string by tabs
headers = strings.Split(line, "\t")

for id, header := range headers {
for idx, header := range headers {
// determine if header is a default VCF header.
// if it is not, assume it's a sampleId and keep
// track of it with an id
if !utils.StringInSlice(strings.ToLower(strings.TrimSpace(strings.ReplaceAll(header, "#", ""))), constants.VcfHeaders) {
headerSampleIds[len(constants.VcfHeaders)-id] = header
headerSampleIds[len(constants.VcfHeaders)-idx] = header
}
}

// If we got to the VCF final header line, we've found all the contigs possible
// --> create required indices (one per contig) with mappings to ensure ES types are consistent and
// mitigate issues we've encountered with e.g., SIGNATURE, where a date field was detected for
// info.value.
fmt.Printf("Got %d contigs: %v\n", len(contigs), contigs)
for _, c := range contigs {
var client = i.ElasticsearchClient

mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING)
res, _ := client.Indices.Create(
variantIndexName(c),
client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))),
)

fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String())
}

discoveredHeaders = true
fmt.Println("Found the headers: ", headers)
fmt.Printf("Found %d headers: %v\n", len(headers), headers)
}
continue
}
Expand Down Expand Up @@ -455,10 +435,10 @@ func (i *IngestionService) ProcessVcf(
var rowWg sync.WaitGroup
rowWg.Add(len(rowComponents))

for rowIndex, rowComponent := range rowComponents {
go func(i int, rc string, rwg *sync.WaitGroup) {
for colIdx, colVal := range rowComponents {
go func(h int, rc string, rwg *sync.WaitGroup) {
defer rwg.Done()
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[i], "#", "", -1)))
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1)))
value := strings.TrimSpace(rc)

// if not a vcf header, assume it's a sampleId header
Expand All @@ -469,6 +449,16 @@ func (i *IngestionService) ProcessVcf(
// Strip out chr prefix for some normalization with human/model-organism contigs
value = strings.ReplaceAll(value, "chr", "")

// We're making contig indices on the fly - check if we haven't created the contig yet.
// If we haven't, create the index and add it to the contigs "set" (map).
contigMutex.Lock()
_, indexExists := contigs[value]
if !indexExists {
i.MakeVariantIndex(value)
contigs[value] = struct{}{} // add contig to the "set" of created configs
}
contigMutex.Unlock()

tmpVariantMapMutex.Lock()
tmpVariant[key] = value
tmpVariantMapMutex.Unlock()
Expand Down Expand Up @@ -563,7 +553,7 @@ func (i *IngestionService) ProcessVcf(
})
tmpSamplesMutex.Unlock()
}
}(rowIndex, rowComponent, &rowWg)
}(colIdx, colVal, &rowWg)
}

rowWg.Wait()
Expand Down Expand Up @@ -875,6 +865,28 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool {
return false
}

func (i *IngestionService) MakeVariantIndex(c string) {
var client = i.ElasticsearchClient
var contigIndex = variantIndexName(c)

res, err := client.Indices.Exists([]string{contigIndex})
if res.StatusCode == 404 {
mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING)
res, _ := client.Indices.Create(
contigIndex,
client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))),
)

fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String())
} else if err != nil {
// The actual check didn't work properly (e.g., couldn't contact ES).
fmt.Printf("Contig index %s existence-check got error: %s\n", c, err)
} else {
// The check worked and the index already exists, so we shouldn't try to recreate it.
fmt.Printf("Contig index %s already exists; skipping creation\n", c)
}
}

func variantIndexName(contig string) string {
return fmt.Sprintf("variants-%s", strings.ToLower(contig))
}

0 comments on commit c5642a8

Please sign in to comment.