diff --git a/etc/example.env b/etc/example.env index 77873013..7aa87cfb 100644 --- a/etc/example.env +++ b/etc/example.env @@ -47,6 +47,7 @@ GOHAN_API_CONTAINERIZED_VCF_PATH=/app/vcfs GOHAN_API_GTF_PATH=/path/to/gtfs/on/host/machine GOHAN_API_CONTAINERIZED_GTF_PATH=/app/gtfs +GOHAN_API_BULK_INDEXING_CAP=10000 GOHAN_API_FILE_PROC_CONC_LVL=3 GOHAN_API_LINE_PROC_CONC_LVL=1000 diff --git a/src/api/main.go b/src/api/main.go index 52c49a29..3cff4811 100644 --- a/src/api/main.go +++ b/src/api/main.go @@ -35,6 +35,7 @@ func main() { "\tVCF Directory Path : %s \n"+ "\tGTF Directory Path : %s \n"+ + "\tBulk Indexing Cap : %d\n"+ "\tFile Processing Concurrency Level : %d\n"+ "\tLine Processing Concurrency Level : %d\n"+ "\tElasticsearch Url : %s \n"+ @@ -55,6 +56,7 @@ func main() { cfg.Debug, cfg.Api.VcfPath, cfg.Api.GtfPath, + cfg.Api.BulkIndexingCap, cfg.Api.FileProcessingConcurrencyLevel, cfg.Api.LineProcessingConcurrencyLevel, cfg.Elasticsearch.Url, cfg.Elasticsearch.Username, diff --git a/src/api/models/config.go b/src/api/models/config.go index b72b8d9b..5f9b316f 100644 --- a/src/api/models/config.go +++ b/src/api/models/config.go @@ -8,6 +8,7 @@ type Config struct { Port string `yaml:"port" envconfig:"GOHAN_API_INTERNAL_PORT"` VcfPath string `yaml:"vcfPath" envconfig:"GOHAN_API_VCF_PATH"` LocalVcfPath string `yaml:"localVcfPath" envconfig:"GOHAN_API_VCF_PATH"` + BulkIndexingCap int `yaml:"BulkIndexingCap" envconfig:"GOHAN_API_BULK_INDEXING_CAP"` FileProcessingConcurrencyLevel int `yaml:"fileProcessingConcurrencyLevel" envconfig:"GOHAN_API_FILE_PROC_CONC_LVL"` LineProcessingConcurrencyLevel int `yaml:"lineProcessingConcurrencyLevel" envconfig:"GOHAN_API_LINE_PROC_CONC_LVL"` GtfPath string `yaml:"gtfPath" envconfig:"GOHAN_API_GTF_PATH"` diff --git a/src/api/models/dtos.go b/src/api/models/dtos.go index c8a0d4ea..e57e417e 100644 --- a/src/api/models/dtos.go +++ b/src/api/models/dtos.go @@ -17,6 +17,8 @@ type BentoV2CompatibleVariantResponseDataModel struct { GenotypeType string `json:"genotype_type"` } +// -- + type VariantsResponseDTO struct { Status int `json:"status"` Message string `json:"message"` @@ -29,6 +31,8 @@ type VariantResponseDataModel struct { Results interface{} `json:"results"` // i.e.: []Variant or []string } +// -- -- + type GenesResponseDTO struct { Status int `json:"status"` Message string `json:"message"` diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 77f1502f..c1e9baff 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -50,10 +50,6 @@ type ( } ) -const ( - defaultBulkIndexingCap int = 10000 // TODO: make parameterizable -) - func NewIngestionService(es *elasticsearch.Client, cfg *models.Config) *IngestionService { iz := &IngestionService{ @@ -62,15 +58,15 @@ func NewIngestionService(es *elasticsearch.Client, cfg *models.Config) *Ingestio IngestRequestMap: map[string]*ingest.VariantIngestRequest{}, GeneIngestRequestChan: make(chan *ingest.GeneIngestRequest), GeneIngestRequestMap: map[string]*ingest.GeneIngestRequest{}, - IngestionBulkIndexingCapacity: defaultBulkIndexingCap, - IngestionBulkIndexingQueue: make(chan *structs.IngestionQueueStructure, defaultBulkIndexingCap), + IngestionBulkIndexingCapacity: cfg.Api.BulkIndexingCap, + IngestionBulkIndexingQueue: make(chan *structs.IngestionQueueStructure, cfg.Api.BulkIndexingCap), GeneIngestionBulkIndexingQueue: make(chan *structs.GeneIngestionQueueStructure, 10), ConcurrentFileIngestionQueue: make(chan bool, cfg.Api.FileProcessingConcurrencyLevel), ElasticsearchClient: es, } //see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster - var numWorkers = defaultBulkIndexingCap / 100 + var numWorkers = iz.IngestionBulkIndexingCapacity / 100 //the lower the denominator (the number of documents per bulk upload). the higher //the chances of 100% successful upload, though the longer it may take (negligible)