Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact!: fixed mappings for variant indices #62

Merged
merged 17 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions etc/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

GOHAN_DEBUG=false
[email protected]
GOHAN_SEMVER=5.0.2
GOHAN_SEMVER=6.0.0
GOHAN_SERVICES="gateway api elasticsearch kibana drs authorization"

# GOOS=linux
Expand Down Expand Up @@ -39,8 +39,8 @@ GOHAN_API_IMAGE=gohan-api
GOHAN_API_VERSION=latest

GOHAN_API_BUILDER_BASE_IMAGE=golang:1.21-bookworm
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.09.01
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.09.01
GOHAN_API_DEV_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:golang-debian-2024.11.01
GOHAN_API_PROD_BASE_IMAGE=ghcr.io/bento-platform/bento_base_image:plain-debian-2024.11.01

GOHAN_API_CONTAINER_NAME=gohan-api
GOHAN_API_SERVICE_HOST=0.0.0.0
Expand Down
63 changes: 63 additions & 0 deletions src/api/models/indexes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,69 @@ type Genotype struct {
Zygosity c.Zygosity `json:"zygosity"`
}

var MAPPING_FIELDS_KEYWORD_IG256 = map[string]interface{}{
"keyword": map[string]interface{}{
"type": "keyword",
"ignore_above": 256,
},
}
var MAPPING_TEXT = map[string]interface{}{"type": "text", "fields": MAPPING_FIELDS_KEYWORD_IG256}
var MAPPING_LONG = map[string]interface{}{"type": "long"}
var MAPPING_FLOAT64 = map[string]interface{}{"type": "double"}
var MAPPING_BOOL = map[string]interface{}{"type": "boolean"}
var MAPPING_DATE = map[string]interface{}{"type": "date"}

// This mapping is derived from the one exported by Victor from the ICHANGE instance on 2024-11-01,
// using the following commands:
// ./bentoctl.bash shell gohan-api
// --> inside gohan-api container
// curl -u $GOHAN_ES_USERNAME:$GOHAN_ES_PASSWORD bentov2-gohan-elasticsearch:9200/_mapping
var VARIANT_INDEX_MAPPING = map[string]interface{}{
"properties": map[string]interface{}{
"chrom": MAPPING_TEXT,
"pos": MAPPING_LONG,
"id": MAPPING_TEXT,
"ref": MAPPING_TEXT,
"alt": MAPPING_TEXT,
"format": MAPPING_TEXT,
"qual": MAPPING_LONG,
"filter": MAPPING_TEXT,
"info": map[string]interface{}{
"properties": map[string]interface{}{
"id": MAPPING_TEXT,
"value": MAPPING_TEXT,
},
},
"sample": map[string]interface{}{
"properties": map[string]interface{}{
"id": MAPPING_TEXT,
"variation": map[string]interface{}{
"properties": map[string]interface{}{
"genotype": map[string]interface{}{
"properties": map[string]interface{}{
"phased": MAPPING_BOOL,
"zygosity": MAPPING_LONG,
},
},
"alleles": map[string]interface{}{
"properties": map[string]interface{}{
"left": MAPPING_TEXT,
"right": MAPPING_TEXT,
},
},
"phredScaleLikelyhood": MAPPING_LONG,
"genotypeProbability": MAPPING_FLOAT64,
},
},
},
},
"fileId": MAPPING_TEXT,
"dataset": MAPPING_TEXT,
"assemblyId": MAPPING_TEXT,
"createdTime": MAPPING_DATE,
},
}

type Gene struct {
Name string `json:"name"`
Chrom string `json:"chrom"`
Expand Down
63 changes: 50 additions & 13 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (i *IngestionService) Init() {
esutil.BulkIndexerItem{
// Action field configures the operation to perform (index, create, delete, update)
Action: "index",
Index: fmt.Sprintf("variants-%s", strings.ToLower(queuedVariant.Chrom)),
Index: variantIndexName(queuedVariant.Chrom),

// Body is an `io.Reader` with the payload
Body: bytes.NewReader(variantData),
Expand Down Expand Up @@ -369,6 +369,10 @@ func (i *IngestionService) ProcessVcf(
defer gr.Close()

scanner := bufio.NewScanner(gr)

contigs := make(map[string]struct{}) // To collect contigs as defined in VCF header
var contigMutex = sync.RWMutex{}

var discoveredHeaders bool = false
var headers []string
headerSampleIds := make(map[int]string)
Expand All @@ -382,28 +386,25 @@ func (i *IngestionService) ProcessVcf(
lineProcessingQueue := make(chan bool, lineProcessingConcurrencyLevel)

for scanner.Scan() {
//fmt.Println(scanner.Text())

// Gather Header row by seeking the CHROM string
// Collect contigs (chromosomes) to create indices
line := scanner.Text()
if !discoveredHeaders {
if line[0:6] == "#CHROM" {
// Split the string by tabs
headers = strings.Split(line, "\t")

for id, header := range headers {
for idx, header := range headers {
// determine if header is a default VCF header.
// if it is not, assume it's a sampleId and keep
// track of it with an id
if !utils.StringInSlice(strings.ToLower(strings.TrimSpace(strings.ReplaceAll(header, "#", ""))), constants.VcfHeaders) {
headerSampleIds[len(constants.VcfHeaders)-id] = header
headerSampleIds[len(constants.VcfHeaders)-idx] = header
}
}

discoveredHeaders = true

fmt.Println("Found the headers: ", headers)
continue
fmt.Printf("Found %d headers: %v\n", len(headers), headers)
}
continue
}
Expand Down Expand Up @@ -435,20 +436,30 @@ func (i *IngestionService) ProcessVcf(
var rowWg sync.WaitGroup
rowWg.Add(len(rowComponents))

for rowIndex, rowComponent := range rowComponents {
go func(i int, rc string, rwg *sync.WaitGroup) {
for colIdx, colVal := range rowComponents {
go func(h int, rc string, rwg *sync.WaitGroup) {
defer rwg.Done()
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[i], "#", "", -1)))
key := strings.ToLower(strings.TrimSpace(strings.Replace(headers[h], "#", "", -1)))
value := strings.TrimSpace(rc)

// if not a vcf header, assume it's a sampleId header
if utils.StringInSlice(key, constants.VcfHeaders) {

// filter field type by column name
if key == "chrom" {
// Strip out all non-numeric characters
// Strip out chr prefix
value = strings.ReplaceAll(value, "chr", "")

// We're making contig indices on the fly - check if we haven't created the contig yet.
// If we haven't, create the index and add it to the contigs "set" (map).
contigMutex.Lock()
_, indexExists := contigs[value]
if !indexExists {
i.MakeVariantIndex(value)
contigs[value] = struct{}{} // add contig to the "set" of created configs
}
contigMutex.Unlock()

// ems if value is valid chromosome
if chromosome.IsValidHumanChromosome(value) {
tmpVariantMapMutex.Lock()
Expand Down Expand Up @@ -554,7 +565,7 @@ func (i *IngestionService) ProcessVcf(
})
tmpSamplesMutex.Unlock()
}
}(rowIndex, rowComponent, &rowWg)
}(colIdx, colVal, &rowWg)
}

rowWg.Wait()
Expand Down Expand Up @@ -865,3 +876,29 @@ func (i *IngestionService) FilenameAlreadyRunning(filename string) bool {
}
return false
}

func (i *IngestionService) MakeVariantIndex(c string) {
var client = i.ElasticsearchClient
var contigIndex = variantIndexName(c)

res, err := client.Indices.Exists([]string{contigIndex})
if res.StatusCode == 404 {
mappings, _ := json.Marshal(indexes.VARIANT_INDEX_MAPPING)
res, _ := client.Indices.Create(
contigIndex,
client.Indices.Create.WithBody(strings.NewReader(fmt.Sprintf(`{"mappings": %s}`, mappings))),
)

fmt.Printf("Creating contig index %s - got response: %s\n", c, res.String())
} else if err != nil {
// The actual check didn't work properly (e.g., couldn't contact ES).
fmt.Printf("Contig index %s existence-check got error: %s\n", c, err)
} else {
// The check worked and the index already exists, so we shouldn't try to recreate it.
fmt.Printf("Contig index %s already exists; skipping creation\n", c)
}
}

func variantIndexName(contig string) string {
return fmt.Sprintf("variants-%s", strings.ToLower(contig))
}
Loading