Skip to content

Commit

Permalink
parameterized bulk indexing cap
Browse files Browse the repository at this point in the history
  • Loading branch information
brouillette committed Jan 3, 2022
1 parent dc75f71 commit a3ac48b
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 7 deletions.
1 change: 1 addition & 0 deletions etc/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ GOHAN_API_CONTAINERIZED_VCF_PATH=/app/vcfs
GOHAN_API_GTF_PATH=/path/to/gtfs/on/host/machine
GOHAN_API_CONTAINERIZED_GTF_PATH=/app/gtfs

GOHAN_API_BULK_INDEXING_CAP=10000
GOHAN_API_FILE_PROC_CONC_LVL=3
GOHAN_API_LINE_PROC_CONC_LVL=1000

Expand Down
2 changes: 2 additions & 0 deletions src/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func main() {

"\tVCF Directory Path : %s \n"+
"\tGTF Directory Path : %s \n"+
"\tBulk Indexing Cap : %d\n"+
"\tFile Processing Concurrency Level : %d\n"+
"\tLine Processing Concurrency Level : %d\n"+
"\tElasticsearch Url : %s \n"+
Expand All @@ -55,6 +56,7 @@ func main() {
cfg.Debug,
cfg.Api.VcfPath,
cfg.Api.GtfPath,
cfg.Api.BulkIndexingCap,
cfg.Api.FileProcessingConcurrencyLevel,
cfg.Api.LineProcessingConcurrencyLevel,
cfg.Elasticsearch.Url, cfg.Elasticsearch.Username,
Expand Down
1 change: 1 addition & 0 deletions src/api/models/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type Config struct {
Port string `yaml:"port" envconfig:"GOHAN_API_INTERNAL_PORT"`
VcfPath string `yaml:"vcfPath" envconfig:"GOHAN_API_VCF_PATH"`
LocalVcfPath string `yaml:"localVcfPath" envconfig:"GOHAN_API_VCF_PATH"`
BulkIndexingCap int `yaml:"BulkIndexingCap" envconfig:"GOHAN_API_BULK_INDEXING_CAP"`
FileProcessingConcurrencyLevel int `yaml:"fileProcessingConcurrencyLevel" envconfig:"GOHAN_API_FILE_PROC_CONC_LVL"`
LineProcessingConcurrencyLevel int `yaml:"lineProcessingConcurrencyLevel" envconfig:"GOHAN_API_LINE_PROC_CONC_LVL"`
GtfPath string `yaml:"gtfPath" envconfig:"GOHAN_API_GTF_PATH"`
Expand Down
4 changes: 4 additions & 0 deletions src/api/models/dtos.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type BentoV2CompatibleVariantResponseDataModel struct {
GenotypeType string `json:"genotype_type"`
}

// --

type VariantsResponseDTO struct {
Status int `json:"status"`
Message string `json:"message"`
Expand All @@ -29,6 +31,8 @@ type VariantResponseDataModel struct {
Results interface{} `json:"results"` // i.e.: []Variant or []string
}

// -- --

type GenesResponseDTO struct {
Status int `json:"status"`
Message string `json:"message"`
Expand Down
10 changes: 3 additions & 7 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ type (
}
)

const (
defaultBulkIndexingCap int = 10000 // TODO: make parameterizable
)

func NewIngestionService(es *elasticsearch.Client, cfg *models.Config) *IngestionService {

iz := &IngestionService{
Expand All @@ -62,15 +58,15 @@ func NewIngestionService(es *elasticsearch.Client, cfg *models.Config) *Ingestio
IngestRequestMap: map[string]*ingest.VariantIngestRequest{},
GeneIngestRequestChan: make(chan *ingest.GeneIngestRequest),
GeneIngestRequestMap: map[string]*ingest.GeneIngestRequest{},
IngestionBulkIndexingCapacity: defaultBulkIndexingCap,
IngestionBulkIndexingQueue: make(chan *structs.IngestionQueueStructure, defaultBulkIndexingCap),
IngestionBulkIndexingCapacity: cfg.Api.BulkIndexingCap,
IngestionBulkIndexingQueue: make(chan *structs.IngestionQueueStructure, cfg.Api.BulkIndexingCap),
GeneIngestionBulkIndexingQueue: make(chan *structs.GeneIngestionQueueStructure, 10),
ConcurrentFileIngestionQueue: make(chan bool, cfg.Api.FileProcessingConcurrencyLevel),
ElasticsearchClient: es,
}

//see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster
var numWorkers = defaultBulkIndexingCap / 100
var numWorkers = iz.IngestionBulkIndexingCapacity / 100
//the lower the denominator (the number of documents per bulk upload). the higher
//the chances of 100% successful upload, though the longer it may take (negligible)

Expand Down

0 comments on commit a3ac48b

Please sign in to comment.