diff --git a/.gitignore b/.gitignore index 6eb188c9..dd455cc7 100644 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,7 @@ bin/* # tmp directories */tmp -*/*/tmp \ No newline at end of file +*/*/tmp + +*.vcf +*.vcf.gz \ No newline at end of file diff --git a/preprocess.sh b/preprocess.sh new file mode 100644 index 00000000..f573d1ea --- /dev/null +++ b/preprocess.sh @@ -0,0 +1,75 @@ +#!/bin/bash + +# credits: +# - https://www.biostars.org/p/78929/ + +FILE=$1 + +if [[ "$FILE" == "" ]];then + echo Missing gz file name! + exit 0 +fi + +if [[ -f $FILE ]];then + echo "$FILE exists" +else + echo "$FILE doesn't exist" + exit 0 +fi + +echo +echo Preprocessing : +echo +echo Splitting $FILE into individual VCFs using PERL +echo +echo Step 1 : creating common and private txt files - +echo This may take a while... + +time zcat $FILE | perl -lane ' +if (/^#/) { if (/^##/) { print STDERR } else { + print STDERR join "\t", @F[0..8]; @samples = @F; +} } else { + print STDERR join "\t", @F[0..8]; + for ($i = 9; $i <= $#F; $i++) { + if ($F[$i] =~ /^..[1-9]/) { + print STDOUT join "\t", $samples[$i], $lc, $F[$i]; +} } } $lc++' 2> vcfs/_vcf.common.txt | sort -k1,1 -k2,2n > vcfs/_vcf.private.txt + +echo Step 2 : converting common and private txt files to individual VCF files - +echo This also may take a while... + +mkdir -p vcfs/split +time perl -lane 'BEGIN { +open IN, "vcfs/_vcf.common.txt" or die $!; +chomp(@common = ); foreach (@common) { + if (/^##/) { $headers .= "$_\n" } else { $headers .= $_; last } +} close IN } +if ($F[0] ne $previousSample) { + close OUT if $previousSample; + open OUT, ">vcfs/split/$F[0].vcf"; + print OUT "$headers\t$F[0]"; +} $previousSample = $F[0]; +print OUT "$common[$F[1]]\t$F[2]"; +END { close OUT }' vcfs/_vcf.private.txt + +echo Step 3 : compressing individual VCF files - +echo This also may take a while... + +time for file in vcfs/split/*vcf; do + gzip -f $file; + # tabix -fp vcf $file.gz +done + +# for file in split/*vcf.gz; do +# gunzip $file +# done + +# rm *vcf.gz +# rm *vcf.gz.tbi + +# Clean up +mv vcfs/split/*.vcf.gz vcfs/ +rmdir vcfs/split/ + +rm vcfs/_vcf.private.txt +rm vcfs/_vcf.common.txt diff --git a/src/api/main.go b/src/api/main.go index af67e982..1a48214b 100644 --- a/src/api/main.go +++ b/src/api/main.go @@ -65,7 +65,7 @@ func main() { // Service Singletons az := services.NewAuthzService(&cfg) - iz := services.NewIngestionService() + iz := services.NewIngestionService(es) // Configure Server e.Use(middleware.Recover()) diff --git a/src/api/mvc/variants.go b/src/api/mvc/variants.go index a3b74e86..fc421755 100644 --- a/src/api/mvc/variants.go +++ b/src/api/mvc/variants.go @@ -73,8 +73,6 @@ func VariantsCountBySampleId(c echo.Context) error { } func VariantsIngest(c echo.Context) error { - es := c.(*contexts.GohanContext).Es7Client - cfg := c.(*contexts.GohanContext).Config vcfPath := cfg.Api.VcfPath drsUrl := cfg.Drs.Url @@ -115,7 +113,7 @@ func VariantsIngest(c echo.Context) error { } } - fmt.Printf("Found .vcf.gz files: %s\n", vcfGzfiles) + //fmt.Printf("Found .vcf.gz files: %s\n", vcfGzfiles) // Locate fileName from request inside found files for _, fileName := range fileNames { @@ -213,7 +211,7 @@ func VariantsIngest(c echo.Context) error { } // --- load back into memory and process - ingestionService.ProcessVcf(vcfFilePath, drsFileId, es) + ingestionService.ProcessVcf(vcfFilePath, drsFileId) // --- delete the temporary vcf file os.Remove(vcfFilePath) @@ -222,7 +220,7 @@ func VariantsIngest(c echo.Context) error { // (WARNING : Only do this when running over a single file) //os.RemoveAll(vcfTmpPath) - fmt.Printf("Ingest duration for file at %s : %s\n", vcfFilePath, time.Now().Sub(startTime)) + fmt.Printf("Ingest duration for file at %s : %s\n", vcfFilePath, time.Since(startTime)) reqStat.State = ingest.Done ingestionService.IngestRequestChan <- reqStat diff --git a/src/api/repositories/elasticsearch/elasticsearch.go b/src/api/repositories/elasticsearch/elasticsearch.go index 315ee945..c21acec9 100644 --- a/src/api/repositories/elasticsearch/elasticsearch.go +++ b/src/api/repositories/elasticsearch/elasticsearch.go @@ -174,7 +174,7 @@ func GetDocumentsContainerVariantOrSampleIdInPositionRange(es *elasticsearch.Cli // Temp resultString := res.String() - fmt.Println(resultString) + // fmt.Println(resultString) // -- // Declared an empty interface @@ -316,7 +316,7 @@ func CountDocumentsContainerVariantOrSampleIdInPositionRange(es *elasticsearch.C // Temp resultString := res.String() - fmt.Println(resultString) + // fmt.Println(resultString) // -- // Declared an empty interface @@ -373,7 +373,7 @@ func GetBucketsByKeyword(es *elasticsearch.Client, keyword string) map[string]in // Temp resultString := res.String() - fmt.Println(resultString) + //fmt.Println(resultString) // -- // Declared an empty interface diff --git a/src/api/services/ingestion.go b/src/api/services/ingestion.go index 06304e26..aa67bafe 100644 --- a/src/api/services/ingestion.go +++ b/src/api/services/ingestion.go @@ -19,11 +19,9 @@ import ( "os" "path/filepath" "regexp" - "runtime" "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/Jeffail/gabs" @@ -34,18 +32,49 @@ import ( type ( IngestionService struct { - Initialized bool - IngestRequestChan chan *ingest.IngestRequest - IngestRequestMap map[string]*ingest.IngestRequest + Initialized bool + IngestRequestChan chan *ingest.IngestRequest + IngestRequestMap map[string]*ingest.IngestRequest + IngestionBulkIndexingCapacity int + IngestionBulkIndexingQueue chan *IngestionQueueStructure + ElasticsearchClient *elasticsearch.Client + IngestionBulkIndexer esutil.BulkIndexer + } + + IngestionQueueStructure struct { + Variant *models.Variant + WaitGroup *sync.WaitGroup } ) -func NewIngestionService() *IngestionService { +const defaultBulkIndexingCap int = 10000 + +func NewIngestionService(es *elasticsearch.Client) *IngestionService { + iz := &IngestionService{ - Initialized: false, - IngestRequestChan: make(chan *ingest.IngestRequest), - IngestRequestMap: map[string]*ingest.IngestRequest{}, + Initialized: false, + IngestRequestChan: make(chan *ingest.IngestRequest), + IngestRequestMap: map[string]*ingest.IngestRequest{}, + IngestionBulkIndexingCapacity: defaultBulkIndexingCap, + IngestionBulkIndexingQueue: make(chan *IngestionQueueStructure, defaultBulkIndexingCap), + ElasticsearchClient: es, } + + // see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster + var numWorkers = defaultBulkIndexingCap / 100 + // the lower the denominator (the number of documents per bulk upload). the higher + // the chances of 100% successful upload, though the longer it may take (negligible) + + bi, _ := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Index: "variants", + Client: iz.ElasticsearchClient, + NumWorkers: numWorkers, + // FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 50MB ?) + FlushInterval: 30 * time.Second, // The periodic flush interval + }) + + iz.IngestionBulkIndexer = bi + iz.Init() return iz @@ -53,8 +82,8 @@ func NewIngestionService() *IngestionService { func (i *IngestionService) Init() { // safeguard to prevent multiple initilizations - if i.Initialized == false { - // spin up a listener for state updates + if !i.Initialized { + // spin up a listener for ingest request updates go func() { for { select { @@ -68,6 +97,58 @@ func (i *IngestionService) Init() { } } }() + + // spin up a listener for bulk indexing + go func() { + for { + select { + case queuedItem := <-i.IngestionBulkIndexingQueue: + + v := queuedItem.Variant + wg := queuedItem.WaitGroup + + // Prepare the data payload: encode article to JSON + data, err := json.Marshal(v) + if err != nil { + log.Fatalf("Cannot encode variant %s: %s\n", v.Id, err) + } + + // Add an item to the BulkIndexer + err = i.IngestionBulkIndexer.Add( + context.Background(), + esutil.BulkIndexerItem{ + // Action field configures the operation to perform (index, create, delete, update) + Action: "index", + + // Body is an `io.Reader` with the payload + Body: bytes.NewReader(data), + + // OnSuccess is called for each successful operation + OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { + defer wg.Done() + //atomic.AddUint64(&countSuccessful, 1) + }, + + // OnFailure is called for each failed operation + OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { + defer wg.Done() + //atomic.AddUint64(&countFailed, 1) + if err != nil { + fmt.Printf("ERROR: %s", err) + } else { + fmt.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) + } + }, + }, + ) + if err != nil { + defer wg.Done() + fmt.Printf("Unexpected error: %s", err) + } + } + } + }() + i.Initialized = true } } @@ -146,7 +227,7 @@ func (i *IngestionService) UploadVcfGzToDrs(gzippedFilePath string, gzipStream * return id } -func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string, es *elasticsearch.Client) { +func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string) { f, err := os.Open(vcfFilePath) if err != nil { fmt.Println("Failed to open file - ", err) @@ -158,11 +239,10 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string, es * var discoveredHeaders bool = false var headers []string - var variants []*models.Variant - variantsMutex := sync.RWMutex{} - nonNumericRegexp := regexp.MustCompile("[^.0-9]") + var _fileWG sync.WaitGroup + for scanner.Scan() { //fmt.Println(scanner.Text()) @@ -182,8 +262,8 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string, es * } } - go func(line string, drsFileId string) { - + _fileWG.Add(1) + go func(line string, drsFileId string, fileWg *sync.WaitGroup) { // ---- break up line rowComponents := strings.Split(line, "\t") @@ -285,87 +365,17 @@ func (i *IngestionService) ProcessVcf(vcfFilePath string, drsFileId string, es * var resultingVariant models.Variant mapstructure.Decode(tmpVariant, &resultingVariant) - variantsMutex.Lock() - variants = append(variants, &resultingVariant) - variantsMutex.Unlock() - - }(line, drsFileId) - } - - // --- push all data to the bulk indexer - fmt.Printf("Number of CPUs available: %d\n", runtime.NumCPU()) - - var countSuccessful uint64 - var countFailed uint64 - - // see: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster - var numWorkers = len(variants) / 50 - // the lower the denominator (the number of documents per bulk upload). the higher - // the chances of 100% successful upload, though the longer it may take (negligible) - - bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Index: "variants", - Client: es, - NumWorkers: numWorkers, - // FlushBytes: int(flushBytes), // The flush threshold in bytes (default: 50MB ?) - FlushInterval: 30 * time.Second, // The periodic flush interval - }) - - var wg sync.WaitGroup - - for _, v := range variants { - - wg.Add(1) - - // Prepare the data payload: encode article to JSON - data, err := json.Marshal(v) - if err != nil { - log.Fatalf("Cannot encode variant %s: %s\n", v.Id, err) - } - - // Add an item to the BulkIndexer - err = bi.Add( - context.Background(), - esutil.BulkIndexerItem{ - // Action field configures the operation to perform (index, create, delete, update) - Action: "index", - - // Body is an `io.Reader` with the payload - Body: bytes.NewReader(data), - - // OnSuccess is called for each successful operation - OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - defer wg.Done() - atomic.AddUint64(&countSuccessful, 1) - - //log.Printf("Added item: %s", item.DocumentID) - }, + // pass variant (along with a waitgroup) to the channel + i.IngestionBulkIndexingQueue <- &IngestionQueueStructure{ + Variant: &resultingVariant, + WaitGroup: fileWg, + } - // OnFailure is called for each failed operation - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - defer wg.Done() - atomic.AddUint64(&countFailed, 1) - if err != nil { - fmt.Printf("ERROR: %s", err) - } else { - fmt.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) - } - }, - }, - ) - if err != nil { - defer wg.Done() - fmt.Printf("Unexpected error: %s", err) - } + }(line, drsFileId, &_fileWG) } - wg.Wait() - - fmt.Printf("Done processing %s with %d variants, with %d stats!\n", vcfFilePath, len(variants), bi.Stats()) - - if err := scanner.Err(); err != nil { - log.Fatal(err) - } + // let all lines be queued up and processed + _fileWG.Wait() } func (i *IngestionService) FilenameAlreadyRunning(filename string) bool { diff --git a/src/tests/integration/api/api_test.go b/src/tests/integration/api/api_test.go index b2bc2987..c5d4e283 100644 --- a/src/tests/integration/api/api_test.go +++ b/src/tests/integration/api/api_test.go @@ -9,6 +9,8 @@ import ( "testing" common "tests/common" + . "github.com/ahmetb/go-linq" + "github.com/stretchr/testify/assert" ) @@ -95,85 +97,80 @@ func TestCanGetVariantsWithSamplesInResultset(t *testing.T) { allDtoResponses := getAllDtosOfVariousCombinationsOfChromosomesAndSampleIds(t, true, "") - // assert that at least one of the responses include a valid sample set - noSamplesAreEmpty := true - for _, dtoResponse := range allDtoResponses { - for _, datum := range dtoResponse.Data { - for _, result := range datum.Results { - for _, sample := range result.Samples { - if sample.SampleId == "" || - sample.Variation == "" { - noSamplesAreEmpty = false - break - } - } - } - } - } - - assert.True(t, noSamplesAreEmpty) + // assert that all of the responses include valid sample sets + // - * accumulate all samples into a single list using the set of + // SelectManyT's and the SelectT + // - ** iterate over each sample in the ForEachT + + From(allDtoResponses).SelectManyT(func(resp models.VariantsResponseDTO) Query { // * + return From(resp.Data) + }).SelectManyT(func(data models.VariantResponseDataModel) Query { + return From(data.Results) + }).SelectManyT(func(variant models.Variant) Query { + return From(variant.Samples) + }).SelectT(func(sample models.Sample) models.Sample { + return sample + }).ForEachT(func(sample models.Sample) { // ** + assert.NotEmpty(t, sample.SampleId) + assert.NotEmpty(t, sample.Variation) + }) } func TestCanGetVariantsInAscendingPositionOrder(t *testing.T) { - + // retrieve responses in ascending order allDtoResponses := getAllDtosOfVariousCombinationsOfChromosomesAndSampleIds(t, false, "asc") - // assert that all variants are in ascending order - isAscendingOrder := true - for _, dtoResponse := range allDtoResponses { - for _, dtoResponse := range dtoResponse.Data { - for i, result := range dtoResponse.Results { - if i == 0 { - continue - } + // assert the dto response slice is plentiful + assert.NotNil(t, allDtoResponses) + + From(allDtoResponses).ForEachT(func(dto models.VariantsResponseDTO) { + // ensure there is data + assert.NotNil(t, dto.Data) + + // check the data + From(dto.Data).ForEachT(func(d models.VariantResponseDataModel) { + // ensure the variants slice is plentiful + assert.NotNil(t, d.Results) - // Fail if 'this' variant position is less than the previous - if result.Pos < dtoResponse.Results[i-1].Pos { - isAscendingOrder = false - break + latestSmallest := 0 + From(d.Results).ForEachT(func(dd models.Variant) { + // verify order + if latestSmallest != 0 { + assert.True(t, latestSmallest <= dd.Pos) } - } - if isAscendingOrder != true { // 1 failure = break all loops - break - } - } - if isAscendingOrder != true { // 1 failure = break all loops - break - } - } - assert.True(t, isAscendingOrder) + latestSmallest = dd.Pos + }) + }) + }) } func TestCanGetVariantsInDescendingPositionOrder(t *testing.T) { - + // retrieve responses in descending order allDtoResponses := getAllDtosOfVariousCombinationsOfChromosomesAndSampleIds(t, false, "desc") - // assert that all variants are in ascending order - isDescendingOrder := true - for _, dtoResponse := range allDtoResponses { - for _, dtoResponse := range dtoResponse.Data { - for i, result := range dtoResponse.Results { - if i == 0 { - continue - } + // assert the dto response slice is plentiful + assert.NotNil(t, allDtoResponses) - // Fail if 'this' variant position is greater than the previous - if result.Pos > dtoResponse.Results[i-1].Pos { - isDescendingOrder = false - break + From(allDtoResponses).ForEachT(func(dto models.VariantsResponseDTO) { + // ensure there is data + assert.NotNil(t, dto.Data) + + // check the data + From(dto.Data).ForEachT(func(d models.VariantResponseDataModel) { + // ensure the variants slice is plentiful + assert.NotNil(t, d.Results) + + latestGreatest := 0 + From(d.Results).ForEachT(func(dd models.Variant) { + if latestGreatest != 0 { + assert.True(t, latestGreatest >= dd.Pos) } - } - if isDescendingOrder != true { // 1 failure = break all loops - break - } - } - if isDescendingOrder != true { // 1 failure = break all loops - break - } - } - assert.True(t, isDescendingOrder) + latestGreatest = dd.Pos + }) + }) + }) } // -- Common utility functions for api tests