diff --git a/.gitignore b/.gitignore index dd455cc7..3a45bee7 100644 --- a/.gitignore +++ b/.gitignore @@ -46,4 +46,7 @@ bin/* */*/tmp *.vcf -*.vcf.gz \ No newline at end of file +*.vcf.gz + +*/*/*.csv +*/*/*.gtf* diff --git a/src/api/main.go b/src/api/main.go index 74763465..9ab54a98 100644 --- a/src/api/main.go +++ b/src/api/main.go @@ -124,13 +124,13 @@ func main() { e.GET("/variants/get/by/variantId", mvc.VariantsGetByVariantId, // middleware - gam.MandateChromosomeAttribute, + gam.ValidateOptionalChromosomeAttribute, gam.MandateCalibratedBounds, gam.MandateAssemblyIdAttribute, gam.ValidatePotentialGenotypeQueryParameter) e.GET("/variants/get/by/sampleId", mvc.VariantsGetBySampleId, // middleware - gam.MandateChromosomeAttribute, + gam.ValidateOptionalChromosomeAttribute, gam.MandateCalibratedBounds, gam.MandateAssemblyIdAttribute, gam.MandateSampleIdsPluralAttribute, @@ -138,13 +138,13 @@ func main() { e.GET("/variants/count/by/variantId", mvc.VariantsCountByVariantId, // middleware - gam.MandateChromosomeAttribute, + gam.ValidateOptionalChromosomeAttribute, gam.MandateCalibratedBounds, gam.MandateAssemblyIdAttribute, gam.ValidatePotentialGenotypeQueryParameter) e.GET("/variants/count/by/sampleId", mvc.VariantsCountBySampleId, // middleware - gam.MandateChromosomeAttribute, + gam.ValidateOptionalChromosomeAttribute, gam.MandateCalibratedBounds, gam.MandateAssemblyIdAttribute, gam.MandateSampleIdsSingularAttribute, @@ -155,6 +155,12 @@ func main() { gam.MandateAssemblyIdAttribute) e.GET("/variants/ingestion/requests", mvc.GetAllVariantIngestionRequests) + // -- Genes + e.GET("/genes/overview", mvc.GetGenesOverview) + e.GET("/genes/search", mvc.GenesGetByNomenclatureWildcard, + // middleware + gam.ValidateOptionalChromosomeAttribute) + // Run e.Logger.Fatal(e.Start(":" + cfg.Api.Port)) } diff --git a/src/api/middleware/chromosomeMiddleware.go b/src/api/middleware/chromosomeMiddleware.go index 67b63766..b55c4986 100644 --- a/src/api/middleware/chromosomeMiddleware.go +++ b/src/api/middleware/chromosomeMiddleware.go @@ -1,8 +1,8 @@ package middleware import ( + "api/models/constants/chromosome" "net/http" - "strconv" "github.com/labstack/echo" ) @@ -10,25 +10,16 @@ import ( /* Echo middleware to ensure a valid `chromosome` HTTP query parameter was provided */ -func MandateChromosomeAttribute(next echo.HandlerFunc) echo.HandlerFunc { +func ValidateOptionalChromosomeAttribute(next echo.HandlerFunc) echo.HandlerFunc { return func(c echo.Context) error { // check for chromosome query parameter chromQP := c.QueryParam("chromosome") - if len(chromQP) == 0 { - // if no id was provided return an error - return echo.NewHTTPError(http.StatusBadRequest, "Missing 'chromosome' query parameter for querying!") - } // verify: - i, conversionErr := strconv.Atoi(chromQP) - if conversionErr != nil { - // if invalid chromosome - return echo.NewHTTPError(http.StatusBadRequest, "Error converting 'chromosome' query parameter! Check your input") - } - - if i <= 0 { - // if chromosome less than 0 - return echo.NewHTTPError(http.StatusBadRequest, "Please provide a 'chromosome' greater than 0!") + if len(chromQP) > 0 && !chromosome.IsValidHumanChromosome(chromQP) { + // if chromosome less than 1 or greater than 23 + // and not 'x', 'y' or 'm' + return echo.NewHTTPError(http.StatusBadRequest, "Please provide a valid 'chromosome' (either 1-23, X, Y, or M)") } return next(c) diff --git a/src/api/models/constants/assembly-id/main.go b/src/api/models/constants/assembly-id/main.go index 0c396ff3..1885f6c9 100644 --- a/src/api/models/constants/assembly-id/main.go +++ b/src/api/models/constants/assembly-id/main.go @@ -11,6 +11,8 @@ const ( GRCh38 constants.AssemblyId = "GRCh38" GRCh37 constants.AssemblyId = "GRCh37" NCBI36 constants.AssemblyId = "NCBI36" + NCBI35 constants.AssemblyId = "NCBI35" + NCBI34 constants.AssemblyId = "NCBI34" Other constants.AssemblyId = "Other" ) @@ -22,6 +24,10 @@ func CastToAssemblyId(text string) constants.AssemblyId { return GRCh37 case "ncbi36": return NCBI36 + case "ncbi35": + return NCBI35 + case "ncbi34": + return NCBI34 case "other": return Other default: diff --git a/src/api/models/constants/chromosome/main.go b/src/api/models/constants/chromosome/main.go new file mode 100644 index 00000000..81f20ecb --- /dev/null +++ b/src/api/models/constants/chromosome/main.go @@ -0,0 +1,37 @@ +package chromosome + +import ( + "strconv" + "strings" +) + +func IsValidHumanChromosome(text string) bool { + + // Check if number can be represented as an int as is non-zero + chromNumber, _ := strconv.Atoi(text) + if chromNumber > 0 { + // It can.. + // Check if it in range 1-23 + if chromNumber < 24 { + return true + } + } else { + // No it can't.. + // Check if it is an X, Y.. + loweredText := strings.ToLower(text) + switch loweredText { + case "x": + return true + case "y": + return true + } + + // ..or M (MT) + switch strings.Contains(loweredText, "m") { + case true: + return true + } + } + + return false +} diff --git a/src/api/models/constants/main.go b/src/api/models/constants/main.go index 119f0663..a86e3849 100644 --- a/src/api/models/constants/main.go +++ b/src/api/models/constants/main.go @@ -7,6 +7,7 @@ package constants associated services. */ type AssemblyId string +type Chromosome string type GenotypeQuery string type SearchOperation string type SortDirection string diff --git a/src/api/models/dtos.go b/src/api/models/dtos.go index bcd23ddd..35b18bfc 100644 --- a/src/api/models/dtos.go +++ b/src/api/models/dtos.go @@ -12,3 +12,11 @@ type VariantResponseDataModel struct { Count int `json:"count"` Results []Variant `json:"results"` // []Variant } + +type GenesResponseDTO struct { + Status int `json:"status"` + Message string `json:"message"` + Term string `json:"term"` + Count int `json:"count"` + Results []Gene `json:"results"` // []Gene +} diff --git a/src/api/models/elasticsearch.go b/src/api/models/elasticsearch.go index 60bd6487..2a3548ca 100644 --- a/src/api/models/elasticsearch.go +++ b/src/api/models/elasticsearch.go @@ -7,7 +7,7 @@ import ( var VcfHeaders = []string{"chrom", "pos", "id", "ref", "alt", "qual", "filter", "info", "format"} type Variant struct { - Chrom int `json:"chrom"` + Chrom string `json:"chrom"` Pos int `json:"pos"` Id string `json:"id"` Ref []string `json:"ref"` @@ -45,3 +45,11 @@ type Genotype struct { AlleleRight int `json:"alleleRight"` // -1 = no call (equivalent to a '.') Zygosity c.Zygosity `json:"zygosity"` } + +type Gene struct { + Name string `json:"name"` + Chrom string `json:"chrom"` + Start int `json:"start"` + End int `json:"end"` + AssemblyId c.AssemblyId `json:"assemblyId"` +} diff --git a/src/api/models/ingest/structs/main.go b/src/api/models/ingest/structs/main.go new file mode 100644 index 00000000..d6010fd5 --- /dev/null +++ b/src/api/models/ingest/structs/main.go @@ -0,0 +1,16 @@ +package structs + +import ( + "api/models" + "sync" +) + +type IngestionQueueStructure struct { + Variant *models.Variant + WaitGroup *sync.WaitGroup +} + +type GeneIngestionQueueStructure struct { + Gene *models.Gene + WaitGroup *sync.WaitGroup +} diff --git a/src/api/mvc/genes.go b/src/api/mvc/genes.go new file mode 100644 index 00000000..2f8036eb --- /dev/null +++ b/src/api/mvc/genes.go @@ -0,0 +1,152 @@ +package mvc + +import ( + "api/contexts" + "api/models" + assemblyId "api/models/constants/assembly-id" + esRepo "api/repositories/elasticsearch" + "fmt" + "net/http" + "strconv" + "sync" + + "github.com/labstack/echo" + "github.com/mitchellh/mapstructure" +) + +func GenesGetByNomenclatureWildcard(c echo.Context) error { + cfg := c.(*contexts.GohanContext).Config + es := c.(*contexts.GohanContext).Es7Client + + // Chromosome search term + chromosomeSearchTerm := c.QueryParam("chromosome") + if len(chromosomeSearchTerm) == 0 { + // if no chromosome is provided, assume "wildcard" search + chromosomeSearchTerm = "*" + } + + // Name search term + term := c.QueryParam("term") + + // Assembly ID + // perform wildcard search if empty/random parameter is passed + // - set to Unknown to trigger it + assId := assemblyId.Unknown + if assemblyId.CastToAssemblyId(c.QueryParam("assemblyId")) != assemblyId.Unknown { + // retrieve passed parameter if is valid + assId = assemblyId.CastToAssemblyId(c.QueryParam("assemblyId")) + } + + // Size + var ( + size int = 25 + sizeCastErr error + ) + if len(c.QueryParam("size")) > 0 { + sizeQP := c.QueryParam("size") + size, sizeCastErr = strconv.Atoi(sizeQP) + if sizeCastErr != nil { + size = 25 + } + } + + fmt.Printf("Executing wildcard genes search for term %s, assemblyId %s (max size: %d)\n", term, assId, size) + + // Execute + docs := esRepo.GetGeneDocumentsByTermWildcard(cfg, es, chromosomeSearchTerm, term, assId, size) + + docsHits := docs["hits"].(map[string]interface{})["hits"] + allDocHits := []map[string]interface{}{} + mapstructure.Decode(docsHits, &allDocHits) + + // grab _source for each hit + var allSources []models.Gene + + for _, r := range allDocHits { + source := r["_source"].(map[string]interface{}) + + // cast map[string]interface{} to struct + var resultingVariant models.Gene + mapstructure.Decode(source, &resultingVariant) + + // accumulate structs + allSources = append(allSources, resultingVariant) + } + + fmt.Printf("Found %d docs!\n", len(allSources)) + + geneResponseDTO := models.GenesResponseDTO{ + Term: term, + Count: len(allSources), + Results: allSources, + Status: 200, + Message: "Success", + } + + return c.JSON(http.StatusOK, geneResponseDTO) +} + +func GetGenesOverview(c echo.Context) error { + + resultsMap := map[string]interface{}{} + resultsMux := sync.RWMutex{} + + es := c.(*contexts.GohanContext).Es7Client + cfg := c.(*contexts.GohanContext).Config + + // retrieve aggregation of genes/chromosomes by assembly id + results := esRepo.GetGeneBucketsByKeyword(cfg, es) + + // begin mapping results + geneChromosomeGroupBucketsMapped := []map[string]interface{}{} + + // loop over top level aggregation and + // accumulated nested aggregations + if aggs, ok := results["aggregations"]; ok { + aggsMapped := aggs.(map[string]interface{}) + + if items, ok := aggsMapped["genes_assembly_id_group"]; ok { + itemsMapped := items.(map[string]interface{}) + + if buckets := itemsMapped["buckets"]; ok { + arrayMappedBuckets := buckets.([]interface{}) + + for _, mappedBucket := range arrayMappedBuckets { + geneChromosomeGroupBucketsMapped = append(geneChromosomeGroupBucketsMapped, mappedBucket.(map[string]interface{})) + } + } + } + } + + individualAssemblyIdKeyMap := map[string]interface{}{} + + // iterated over each assemblyId bucket + for _, chromGroupBucketMap := range geneChromosomeGroupBucketsMapped { + + assemblyIdKey := fmt.Sprint(chromGroupBucketMap["key"]) + + numGenesPerChromMap := map[string]interface{}{} + bucketsMapped := map[string]interface{}{} + + if chromGroupItem, ok := chromGroupBucketMap["genes_chromosome_group"]; ok { + chromGroupItemMapped := chromGroupItem.(map[string]interface{}) + + for _, chromBucket := range chromGroupItemMapped["buckets"].([]interface{}) { + doc_key := fmt.Sprint(chromBucket.(map[string]interface{})["key"]) // ensure strings and numbers are expressed as strings + doc_count := chromBucket.(map[string]interface{})["doc_count"] + + // add to list of buckets by chromosome + bucketsMapped[doc_key] = doc_count + } + } + + numGenesPerChromMap["numberOfGenesPerChromosome"] = bucketsMapped + individualAssemblyIdKeyMap[assemblyIdKey] = numGenesPerChromMap + } + + resultsMux.Lock() + resultsMap["assemblyIDs"] = individualAssemblyIdKeyMap + resultsMux.Unlock() + + return c.JSON(http.StatusOK, resultsMap) +} diff --git a/src/api/mvc/variants.go b/src/api/mvc/variants.go index 6ffe7045..8eb312f6 100644 --- a/src/api/mvc/variants.go +++ b/src/api/mvc/variants.go @@ -249,7 +249,7 @@ func GetVariantsOverview(c echo.Context) error { callGetBucketsByKeyword := func(key string, keyword string, _wg *sync.WaitGroup) { defer _wg.Done() - results := esRepo.GetBucketsByKeyword(cfg, es, keyword) + results := esRepo.GetVariantsBucketsByKeyword(cfg, es, keyword) // retrieve aggregations.items.buckets bucketsMapped := []interface{}{} @@ -281,7 +281,7 @@ func GetVariantsOverview(c echo.Context) error { // get distribution of chromosomes wg.Add(1) - go callGetBucketsByKeyword("chromosomes", "chrom", &wg) + go callGetBucketsByKeyword("chromosomes", "chrom.keyword", &wg) // get distribution of variant IDs wg.Add(1) diff --git a/src/api/repositories/elasticsearch/genes.go b/src/api/repositories/elasticsearch/genes.go new file mode 100644 index 00000000..d7133ef8 --- /dev/null +++ b/src/api/repositories/elasticsearch/genes.go @@ -0,0 +1,201 @@ +package elasticsearch + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "log" + "net/http" + "time" + + "api/models" + "api/models/constants" + assemblyId "api/models/constants/assembly-id" + + "github.com/elastic/go-elasticsearch" +) + +const genesIndex = "genes" + +func GetGeneBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client) map[string]interface{} { + // begin building the request body. + var buf bytes.Buffer + aggMap := map[string]interface{}{ + "size": "0", + "aggs": map[string]interface{}{ + "genes_assembly_id_group": map[string]interface{}{ + "terms": map[string]interface{}{ + "field": "assemblyId.keyword", + "size": "10000", // increases the number of buckets returned (default is 10) + "order": map[string]string{ + "_key": "asc", + }, + }, + "aggs": map[string]interface{}{ + "genes_chromosome_group": map[string]interface{}{ + "terms": map[string]interface{}{ + "field": "chrom.keyword", + "size": "10000", // increases the number of buckets returned (default is 10) + "order": map[string]string{ + "_key": "asc", + }, + }, + }, + }, + }, + }, + } + + // encode the query + if err := json.NewEncoder(&buf).Encode(aggMap); err != nil { + log.Fatalf("Error encoding aggMap: %s\n", err) + } + + if cfg.Debug { + // view the outbound elasticsearch query + myString := string(buf.Bytes()[:]) + fmt.Println(myString) + } + + // TEMP: SECURITY RISK + http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + // + // Perform the search request. + res, searchErr := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(genesIndex), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + ) + if searchErr != nil { + fmt.Printf("Error getting response: %s\n", searchErr) + } + + defer res.Body.Close() + + resultString := res.String() + if cfg.Debug { + fmt.Println(resultString) + } + + // Declared an empty interface + result := make(map[string]interface{}) + + // Unmarshal or Decode the JSON to the interface. + // Known bug: response comes back with a preceding '[200 OK] ' which needs trimming (hence the [9:]) + umErr := json.Unmarshal([]byte(resultString[9:]), &result) + if umErr != nil { + fmt.Printf("Error unmarshalling response: %s\n", umErr) + } + + fmt.Printf("Query End: %s\n", time.Now()) + + return result +} + +func GetGeneDocumentsByTermWildcard(cfg *models.Config, es *elasticsearch.Client, + chromosomeSearchTerm string, term string, assId constants.AssemblyId, size int) map[string]interface{} { + + // TEMP: SECURITY RISK + http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + // + + // Nomenclature Search Term + nomenclatureStringTerm := fmt.Sprintf("*%s*", term) + + // Assembly Id Search Term (wildcard by default) + assemblyIdStringTerm := "*" + if assId != assemblyId.Unknown { + assemblyIdStringTerm = string(assId) + } + + var buf bytes.Buffer + query := map[string]interface{}{ + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "filter": []map[string]interface{}{{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{ + { + "query_string": map[string]interface{}{ + "fields": []string{"chrom"}, + "query": chromosomeSearchTerm, + }, + }, + { + "query_string": map[string]interface{}{ + "fields": []string{"name"}, + "query": nomenclatureStringTerm, + }, + }, + { + "query_string": map[string]interface{}{ + "fields": []string{"assemblyId"}, + "query": assemblyIdStringTerm, + }, + }, + }, + }, + }}, + }, + }, + "size": size, + "sort": []map[string]interface{}{ + { + "chrom.keyword": map[string]interface{}{ + "order": "asc", + }, + }, + { + "start": map[string]interface{}{ + "order": "asc", + }, + }, + }, + } + + // encode the query + if err := json.NewEncoder(&buf).Encode(query); err != nil { + log.Fatalf("Error encoding query: %s\n", err) + } + + if cfg.Debug { + // view the outbound elasticsearch query + myString := string(buf.Bytes()[:]) + fmt.Println(myString) + } + + // Perform the search request. + searchRes, searchErr := es.Search( + es.Search.WithContext(context.Background()), + es.Search.WithIndex(genesIndex), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + ) + if searchErr != nil { + fmt.Printf("Error getting response: %s\n", searchErr) + } + + defer searchRes.Body.Close() + + resultString := searchRes.String() + if cfg.Debug { + fmt.Println(resultString) + } + + // Prepare an empty interface + result := make(map[string]interface{}) + + // Unmarshal or Decode the JSON to the empty interface. + // Known bug: response comes back with a preceding '[200 OK] ' which needs trimming (hence the [9:]) + umErr := json.Unmarshal([]byte(resultString[9:]), &result) + if umErr != nil { + fmt.Printf("Error unmarshalling gene search response: %s\n", umErr) + } + + return result +} diff --git a/src/api/repositories/elasticsearch/main.go b/src/api/repositories/elasticsearch/variants.go similarity index 97% rename from src/api/repositories/elasticsearch/main.go rename to src/api/repositories/elasticsearch/variants.go index 41cd120c..7da69be4 100644 --- a/src/api/repositories/elasticsearch/main.go +++ b/src/api/repositories/elasticsearch/variants.go @@ -19,6 +19,8 @@ import ( "github.com/elastic/go-elasticsearch" ) +const variantsIndex = "variants" + func GetDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, es *elasticsearch.Client, chromosome string, lowerBound int, upperBound int, variantId string, sampleId string, @@ -213,7 +215,7 @@ func GetDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, e // Perform the search request. res, searchErr := es.Search( es.Search.WithContext(context.Background()), - es.Search.WithIndex("variants"), + es.Search.WithIndex(variantsIndex), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), @@ -423,7 +425,7 @@ func CountDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, // Perform the search request. res, searchErr := es.Count( es.Count.WithContext(context.Background()), - es.Count.WithIndex("variants"), + es.Count.WithIndex(variantsIndex), es.Count.WithBody(&buf), es.Count.WithPretty(), ) @@ -453,8 +455,7 @@ func CountDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, return result } -func GetBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, keyword string) map[string]interface{} { - +func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, keyword string) map[string]interface{} { // begin building the request body. var buf bytes.Buffer aggMap := map[string]interface{}{ @@ -464,6 +465,9 @@ func GetBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, keyword s "terms": map[string]interface{}{ "field": keyword, "size": "10000", // increases the number of buckets returned (default is 10) + "order": map[string]string{ + "_key": "asc", + }, }, }, }, @@ -486,7 +490,7 @@ func GetBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, keyword s // Perform the search request. res, searchErr := es.Search( es.Search.WithContext(context.Background()), - es.Search.WithIndex("variants"), + es.Search.WithIndex(variantsIndex), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 282d971a..0ed969e3 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -3,8 +3,10 @@ package services import ( "api/models" "api/models/constants" + "api/models/constants/chromosome" z "api/models/constants/zygosity" "api/models/ingest" + "api/models/ingest/structs" "api/utils" "bufio" "bytes" @@ -20,7 +22,6 @@ import ( "net/http" "os" "path/filepath" - "regexp" "strconv" "strings" "sync" @@ -34,18 +35,15 @@ import ( type ( IngestionService struct { - Initialized bool - IngestRequestChan chan *ingest.IngestRequest - IngestRequestMap map[string]*ingest.IngestRequest - IngestionBulkIndexingCapacity int - IngestionBulkIndexingQueue chan *IngestionQueueStructure - ElasticsearchClient *elasticsearch.Client - IngestionBulkIndexer esutil.BulkIndexer - } - - IngestionQueueStructure struct { - Variant *models.Variant - WaitGroup *sync.WaitGroup + Initialized bool + IngestRequestChan chan *ingest.IngestRequest + IngestRequestMap map[string]*ingest.IngestRequest + IngestionBulkIndexingCapacity int + ElasticsearchClient *elasticsearch.Client + IngestionBulkIndexingQueue chan *structs.IngestionQueueStructure + IngestionBulkIndexer esutil.BulkIndexer + GeneIngestionBulkIndexingQueue chan *structs.GeneIngestionQueueStructure + GeneIngestionBulkIndexer esutil.BulkIndexer } ) @@ -54,29 +52,38 @@ const defaultBulkIndexingCap int = 10000 func NewIngestionService(es *elasticsearch.Client) *IngestionService { iz := &IngestionService{ - Initialized: false, - IngestRequestChan: make(chan *ingest.IngestRequest), - IngestRequestMap: map[string]*ingest.IngestRequest{}, - IngestionBulkIndexingCapacity: defaultBulkIndexingCap, - IngestionBulkIndexingQueue: make(chan *IngestionQueueStructure, defaultBulkIndexingCap), - ElasticsearchClient: es, + Initialized: false, + IngestRequestChan: make(chan *ingest.IngestRequest), + IngestRequestMap: map[string]*ingest.IngestRequest{}, + IngestionBulkIndexingCapacity: defaultBulkIndexingCap, + IngestionBulkIndexingQueue: make(chan *structs.IngestionQueueStructure, defaultBulkIndexingCap), + GeneIngestionBulkIndexingQueue: make(chan *structs.GeneIngestionQueueStructure, 10), + ElasticsearchClient: es, } - // see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster + //see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster var numWorkers = defaultBulkIndexingCap / 100 - // the lower the denominator (the number of documents per bulk upload). the higher - // the chances of 100% successful upload, though the longer it may take (negligible) + //the lower the denominator (the number of documents per bulk upload). the higher + //the chances of 100% successful upload, though the longer it may take (negligible) bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: "variants", Client: iz.ElasticsearchClient, NumWorkers: numWorkers, - // FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 50MB ?) - FlushInterval: 30 * time.Second, // The periodic flush interval + // FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 5MB ?) + FlushInterval: time.Second, // The periodic flush interval }) - iz.IngestionBulkIndexer = bi + gbi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: "genes", + Client: iz.ElasticsearchClient, + NumWorkers: numWorkers, + //FlushBytes: int(64), // The flush threshold in bytes (default: 5MB ?) + FlushInterval: 3 * time.Second, // The periodic flush interval + }) + iz.GeneIngestionBulkIndexer = gbi + iz.Init() return iz @@ -100,7 +107,7 @@ func (i *IngestionService) Init() { } }() - // spin up a listener for bulk indexing + // spin up a listener for each bulk indexing go func() { for { select { @@ -117,6 +124,57 @@ func (i *IngestionService) Init() { // Add an item to the BulkIndexer err = i.IngestionBulkIndexer.Add( + context.Background(), + esutil.BulkIndexerItem{ + // Action field configures the operation to perform (index, create, delete, update) + Action: "index", + + // Body is an `io.Reader` with the payload + Body: bytes.NewReader(data), + + // OnSuccess is called for each successful operation + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + defer wg.Done() + //fmt.Printf("Successfully indexed: %s", item) + //atomic.AddUint64(&countSuccessful, 1) + }, + + // OnFailure is called for each failed operation + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + defer wg.Done() + //atomic.AddUint64(&countFailed, 1) + if err != nil { + fmt.Printf("ERROR: %s", err) + } else { + fmt.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) + } + }, + }, + ) + if err != nil { + defer wg.Done() + fmt.Printf("Unexpected error: %s", err) + } + } + } + }() + + go func() { + for { + select { + case queuedItem := <-i.GeneIngestionBulkIndexingQueue: + + g := queuedItem.Gene + wg := queuedItem.WaitGroup + + // Prepare the data payload: encode article to JSON + data, err := json.Marshal(g) + if err != nil { + log.Fatalf("Cannot encode gene %s: %s\n", g, err) + } + + // Add an item to the BulkIndexer + err = i.GeneIngestionBulkIndexer.Add( context.Background(), esutil.BulkIndexerItem{ // Action field configures the operation to perform (index, create, delete, update) @@ -134,6 +192,7 @@ func (i *IngestionService) Init() { // OnFailure is called for each failed operation OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { defer wg.Done() + fmt.Printf("Failure Repsonse: %s", res.Error) //atomic.AddUint64(&countFailed, 1) if err != nil { fmt.Printf("ERROR: %s", err) @@ -241,8 +300,6 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string, asse var discoveredHeaders bool = false var headers []string - nonNumericRegexp := regexp.MustCompile("[^.0-9]") - var _fileWG sync.WaitGroup for scanner.Scan() { @@ -292,11 +349,21 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string, asse if utils.StringInSlice(key, models.VcfHeaders) { // filter field type by column name - if key == "chrom" || key == "pos" || key == "qual" { - if key == "chrom" { - // Strip out all non-numeric characters - value = nonNumericRegexp.ReplaceAllString(value, "") + if key == "chrom" { + // Strip out all non-numeric characters + value = strings.ReplaceAll(value, "chr", "") + + // ems if value is valid chromosome + if chromosome.IsValidHumanChromosome(value) { + tmpVariantMapMutex.Lock() + tmpVariant[key] = value + tmpVariantMapMutex.Unlock() + } else { + tmpVariantMapMutex.Lock() + tmpVariant[key] = "err" + tmpVariantMapMutex.Unlock() } + } else if key == "pos" || key == "qual" { // // Convert string's to int's, if possible value, err := strconv.ParseInt(value, 10, 0) @@ -489,7 +556,7 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string, asse mapstructure.Decode(tmpVariant, &resultingVariant) // pass variant (along with a waitgroup) to the channel - i.IngestionBulkIndexingQueue <- &IngestionQueueStructure{ + i.IngestionBulkIndexingQueue <- &structs.IngestionQueueStructure{ Variant: &resultingVariant, WaitGroup: fileWg, } diff --git a/src/api/utils/connections.go b/src/api/utils/connections.go index 79d2e789..1e350cec 100644 --- a/src/api/utils/connections.go +++ b/src/api/utils/connections.go @@ -39,7 +39,7 @@ func CreateEsConnection(cfg *models.Config) *es7.Client { es7Client, _ := es7.NewClient(esCfg) - fmt.Printf("Using ES7 Client Version %s", es7.Version) + fmt.Printf("Using ES7 Client Version %s\n", es7.Version) return es7Client } diff --git a/src/gota-poc/main.go b/src/gota-poc/main.go new file mode 100644 index 00000000..ae812f4b --- /dev/null +++ b/src/gota-poc/main.go @@ -0,0 +1,260 @@ +package main + +import ( + "api/models" + "api/models/constants" + assemblyId "api/models/constants/assembly-id" + "api/models/constants/chromosome" + "api/models/ingest/structs" + "api/services" + "api/utils" + "bufio" + "compress/gzip" + "crypto/tls" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" + "strconv" + "strings" + "sync" + + "github.com/kelseyhightower/envconfig" +) + +func main() { + + // Gather environment variables + var cfg models.Config + err := envconfig.Process("", &cfg) + if err != nil { + fmt.Println(err) + os.Exit(2) + } + + // TEMP: SECURITY RISK + http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + // + + // Service Connections: + // -- Elasticsearch + es := utils.CreateEsConnection(&cfg) + iz := services.NewIngestionService(es) + iz.Init() + + assemblyIdMap := map[constants.AssemblyId]string{ + assemblyId.GRCh38: "gencode.v38.annotation.gtf", + assemblyId.GRCh37: "gencode.v19.annotation.gtf", + // SKIP + // assemblyId.NCBI36: "hg18", + // assemblyId.NCBI35: "hg17", + // assemblyId.NCBI34: "hg16", + } + assemblyIdGTFUrlMap := map[constants.AssemblyId]string{ + assemblyId.GRCh38: "http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_38/gencode.v38.annotation.gtf.gz", + assemblyId.GRCh37: "http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_19/gencode.v19.annotation.gtf.gz", + // SKIP + // assemblyId.NCBI36: "", + // assemblyId.NCBI35: "", + // assemblyId.NCBI34: "", + } + + var geneWg sync.WaitGroup + + for assId, fileName := range assemblyIdMap { + // Read one file at a time + + gtfFile, err := os.Open(fileName) + if err != nil { + // log.Fatalf("failed to open file: %s", err) + // Download the file + fullURLFile := assemblyIdGTFUrlMap[assId] + + // Build fileName from fullPath + fileURL, err := url.Parse(fullURLFile) + if err != nil { + log.Fatal(err) + } + path := fileURL.Path + segments := strings.Split(path, "/") + fileName = segments[len(segments)-1] + + // Create blank file + file, err := os.Create(fileName) + if err != nil { + log.Fatal(err) + } + client := http.Client{ + CheckRedirect: func(r *http.Request, via []*http.Request) error { + r.URL.Opaque = r.URL.Path + return nil + }, + } + fmt.Printf("Downloading file %s ...\n", fileName) + + // Put content on file + resp, err := client.Get(fullURLFile) + if err != nil { + log.Fatal(err) + } + defer resp.Body.Close() + + size, err := io.Copy(file, resp.Body) + if err != nil { + log.Fatal(err) + } + defer file.Close() + + fmt.Printf("Downloaded a file %s with size %d\n", fileName, size) + + fmt.Printf("Unzipping %s...\n", fileName) + gzipfile, err := os.Open(fileName) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + reader, err := gzip.NewReader(gzipfile) + if err != nil { + fmt.Println(err) + os.Exit(1) + } + defer reader.Close() + + newfilename := strings.TrimSuffix(fileName, ".gz") + + writer, err := os.Create(newfilename) + + if err != nil { + fmt.Println(err) + os.Exit(1) + } + + defer writer.Close() + + if _, err = io.Copy(writer, reader); err != nil { + fmt.Println(err) + os.Exit(1) + } + + fmt.Printf("Opening %s\n", newfilename) + gtfFile, _ = os.Open(newfilename) + + fmt.Printf("Deleting %s\n", fileName) + err = os.Remove(fileName) + if err != nil { + fmt.Println(err) + } + } + + defer gtfFile.Close() + + fileScanner := bufio.NewScanner(gtfFile) + fileScanner.Split(bufio.ScanLines) + + fmt.Printf("Ingesting %s\n", string(assId)) + + var ( + chromHeaderKey = 0 + startKey = 3 + endKey = 4 + nameHeaderKeys = []int{3} + geneNameHeaderKeys []int + ) + + var columnsToPrint []string + if assId == assemblyId.GRCh38 { + // GRCh38 dataset has multiple name fields (name, name2) and + // also includes gene name fields (geneName, geneName2) + columnsToPrint = append(columnsToPrint, "#chrom", "chromStart", "chromEnd", "name", "name2", "geneName", "geneName2") + nameHeaderKeys = append(nameHeaderKeys, 4) + geneNameHeaderKeys = append(geneNameHeaderKeys, 5, 6) + } else { + columnsToPrint = append(columnsToPrint, "chrom", "txStart", "txEnd", "#name") + } + + for fileScanner.Scan() { + rowText := fileScanner.Text() + if rowText[:2] == "##" { + // Skip header rows + continue + } + + geneWg.Add(1) + go func(rowText string, _chromHeaderKey int, + _startKey int, _endKey int, + _nameHeaderKeys []int, _geneNameHeaderKeys []int, + _assId constants.AssemblyId, + _gwg *sync.WaitGroup) { + // fmt.Printf("row : %s\n", row) + + var ( + start int + end int + geneName string + ) + + rowSplits := strings.Split(rowText, "\t") + + // skip this row if it's not a gene row + // i.e, if it's an exon or transcript + if rowSplits[2] != "gene" { + defer _gwg.Done() + return + } + + //clean chromosome + chromosomeClean := strings.ReplaceAll(rowSplits[_chromHeaderKey], "chr", "") + + if !chromosome.IsValidHumanChromosome(chromosomeClean) { + defer _gwg.Done() + return + } + // http://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_38/gencode.v38.annotation.gtf.gz + + // clean start/end + chromStartClean := strings.ReplaceAll(strings.ReplaceAll(rowSplits[_startKey], ",", ""), " ", "") + start, _ = strconv.Atoi(chromStartClean) + + chromEndClean := strings.ReplaceAll(strings.ReplaceAll(rowSplits[_endKey], ",", ""), " ", "") + end, _ = strconv.Atoi(chromEndClean) + + dataClumpSplits := strings.Split(rowSplits[len(rowSplits)-1], ";") + for _, v := range dataClumpSplits { + if strings.Contains(v, "gene_name") { + cleanedItemSplits := strings.Split(strings.TrimSpace(strings.ReplaceAll(v, "\"", "")), " ") + if len(cleanedItemSplits) > 0 { + geneName = cleanedItemSplits[len(cleanedItemSplits)-1] + } + break + } + } + if len(geneName) == 0 { + fmt.Printf("No gene found in row %s\n", rowText) + return + } + + discoveredGene := &models.Gene{ + Name: geneName, + Chrom: chromosomeClean, + Start: start, + End: end, + AssemblyId: _assId, + } + + //fmt.Printf("Keys :%d, %d, %d, %d, %d -- %s\n", _chromHeaderKey, _startKey, _endKey, _nameHeaderKeys, _geneNameHeaderKeys, discoveredGene) + + iz.GeneIngestionBulkIndexingQueue <- &structs.GeneIngestionQueueStructure{ + Gene: discoveredGene, + WaitGroup: _gwg, + } + }(rowText, chromHeaderKey, startKey, endKey, nameHeaderKeys, geneNameHeaderKeys, assId, &geneWg) + + // fmt.Printf("Stats : %d\n", iz.GeneIngestionBulkIndexer.Stats()) + } + geneWg.Wait() + + } +} diff --git a/src/tests/integration/api/api_gene_test.go b/src/tests/integration/api/api_gene_test.go new file mode 100644 index 00000000..103f3bc2 --- /dev/null +++ b/src/tests/integration/api/api_gene_test.go @@ -0,0 +1,182 @@ +package api + +import ( + "api/models" + c "api/models/constants" + a "api/models/constants/assembly-id" + "api/models/constants/chromosome" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "sync" + "testing" + common "tests/common" + + . "github.com/ahmetb/go-linq" + + "github.com/stretchr/testify/assert" +) + +const ( + GenesOverviewPath string = "%s/genes/overview" + GenesSearchPathWithQueryString string = "%s/genes/search%s" +) + +func TestGenesOverview(t *testing.T) { + cfg := common.InitConfig() + + overviewJson := getGenesOverview(t, cfg) + assert.NotNil(t, overviewJson) +} + +func TestCanGetGenesByAssemblyIdAndChromosome(t *testing.T) { + // retrieve all possible combinations of responses + allDtoResponses := getAllDtosOfVariousCombinationsOfGenesAndAssemblyIDs(t) + + // assert the dto response slice is plentiful + assert.NotNil(t, allDtoResponses) + + From(allDtoResponses).ForEachT(func(dto models.GenesResponseDTO) { + // ensure there are results in the response + assert.NotNil(t, dto.Results) + + // check the resulting data + From(dto.Results).ForEachT(func(gene models.Gene) { + // ensure the gene is legit + assert.NotNil(t, gene.Name) + assert.NotNil(t, gene.AssemblyId) + assert.True(t, chromosome.IsValidHumanChromosome(gene.Chrom)) + assert.Greater(t, gene.End, gene.Start) + }) + }) +} + +func getAllDtosOfVariousCombinationsOfGenesAndAssemblyIDs(_t *testing.T) []models.GenesResponseDTO { + cfg := common.InitConfig() + + // retrieve the overview + overviewJson := getGenesOverview(_t, cfg) + + // ensure the response is valid + // TODO: error check instead of nil check + assert.NotNil(_t, overviewJson) + + // initialize a common slice in which to + // accumulate al responses asynchronously + allDtoResponses := []models.GenesResponseDTO{} + allDtoResponsesMux := sync.RWMutex{} + + var combWg sync.WaitGroup + for _, assemblyIdOverviewBucket := range overviewJson { + + // range over all assembly IDs + for assemblyIdString, genesPerChromosomeBucket := range assemblyIdOverviewBucket.(map[string]interface{}) { + + fmt.Println(assemblyIdString) + fmt.Println(genesPerChromosomeBucket) + + castedBucket := genesPerChromosomeBucket.(map[string]interface{})["numberOfGenesPerChromosome"].(map[string]interface{}) + + for chromosomeString, _ := range castedBucket { // _ = number of genes (unused) + + combWg.Add(1) + go func(_wg *sync.WaitGroup, _assemblyIdString string, _chromosomeString string) { + defer _wg.Done() + + assemblyId := a.CastToAssemblyId(_assemblyIdString) + + // make the call + dto := buildQueryAndMakeGetGenesCall(_chromosomeString, "", assemblyId, _t, cfg) + + // ensure there is data returned + // (we'd be making a bad query, otherwise) + assert.True(_t, len(dto.Results) > 0) + + // accumulate all response objects + // to a common slice in an + // asynchronous-safe manner + allDtoResponsesMux.Lock() + allDtoResponses = append(allDtoResponses, dto) + allDtoResponsesMux.Unlock() + }(&combWg, assemblyIdString, chromosomeString) + } + + } + + } + combWg.Wait() + + return allDtoResponses +} + +func getGenesOverview(_t *testing.T, _cfg *models.Config) map[string]interface{} { + request, _ := http.NewRequest("GET", fmt.Sprintf(GenesOverviewPath, _cfg.Api.Url), nil) + + client := &http.Client{} + response, responseErr := client.Do(request) + assert.Nil(_t, responseErr) + + defer response.Body.Close() + + // this test (at the time of writing) will only work if authorization is disabled + shouldBe := 200 + assert.Equal(_t, shouldBe, response.StatusCode, fmt.Sprintf("Error -- Api GET / Status: %s ; Should be %d", response.Status, shouldBe)) + + // -- interpret array of ingestion requests from response + overviewRespBody, overviewRespBodyErr := ioutil.ReadAll(response.Body) + assert.Nil(_t, overviewRespBodyErr) + + // --- transform body bytes to string + overviewRespBodyString := string(overviewRespBody) + + // -- check for json error + var overviewRespJson map[string]interface{} + overviewJsonUnmarshallingError := json.Unmarshal([]byte(overviewRespBodyString), &overviewRespJson) + assert.Nil(_t, overviewJsonUnmarshallingError) + + // -- insure it's an empty array + assemblyIDsKey, assidkOk := overviewRespJson["assemblyIDs"] + assert.True(_t, assidkOk) + assert.NotNil(_t, assemblyIDsKey) + + return overviewRespJson +} + +func buildQueryAndMakeGetGenesCall(chromosome string, term string, assemblyId c.AssemblyId, _t *testing.T, _cfg *models.Config) models.GenesResponseDTO { + + queryString := fmt.Sprintf("?chromosome=%s&assemblyId=%s", chromosome, assemblyId) + + url := fmt.Sprintf(GenesSearchPathWithQueryString, _cfg.Api.Url, queryString) + + return getGetGenesCall(url, _t) +} + +func getGetGenesCall(url string, _t *testing.T) models.GenesResponseDTO { + fmt.Printf("Calling %s\n", url) + request, _ := http.NewRequest("GET", url, nil) + + client := &http.Client{} + response, responseErr := client.Do(request) + assert.Nil(_t, responseErr) + + defer response.Body.Close() + + // this test (at the time of writing) will only work if authorization is disabled + shouldBe := 200 + assert.Equal(_t, shouldBe, response.StatusCode, fmt.Sprintf("Error -- Api GET %s Status: %s ; Should be %d", url, response.Status, shouldBe)) + + // -- interpret array of ingestion requests from response + respBody, respBodyErr := ioutil.ReadAll(response.Body) + assert.Nil(_t, respBodyErr) + + // --- transform body bytes to string + respBodyString := string(respBody) + + // -- convert to json and check for error + var respDto models.GenesResponseDTO + jsonUnmarshallingError := json.Unmarshal([]byte(respBodyString), &respDto) + assert.Nil(_t, jsonUnmarshallingError) + + return respDto +} diff --git a/src/tests/integration/api/api_test.go b/src/tests/integration/api/api_variant_test.go similarity index 100% rename from src/tests/integration/api/api_test.go rename to src/tests/integration/api/api_variant_test.go