Skip to content

Commit

Permalink
ingest genes api
Browse files Browse the repository at this point in the history
  • Loading branch information
brouillette committed Oct 5, 2021
1 parent 8cc66e1 commit 10ec719
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 32 deletions.
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ services:
- GOHAN_DRS_BASIC_AUTH_PASSWORD=${GOHAN_DRS_BASIC_AUTH_PASSWORD}
volumes:
- ${GOHAN_API_VCF_PATH}:${GOHAN_API_CONTAINERIZED_VCF_PATH}
- ${GOHAN_API_GTF_PATH}:${GOHAN_API_CONTAINERIZED_GTF_PATH}

elasticsearch:
image: ${GOHAN_ES_BASE_IMAGE}:${GOHAN_ES_BASE_VERSION}
Expand Down
3 changes: 2 additions & 1 deletion src/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,11 @@ func main() {

// -- Genes
e.GET("/genes/overview", mvc.GetGenesOverview)
e.GET("/genes/ingest", mvc.GenesIngest)
e.GET("/genes/search", mvc.GenesGetByNomenclatureWildcard,
// middleware
gam.ValidateOptionalChromosomeAttribute)
e.GET("/genes/ingestion/requests", mvc.GetAllGeneIngestionRequests)
e.GET("/genes/ingestion/run", mvc.GenesIngest)

// Run
e.Logger.Fatal(e.Start(":" + cfg.Api.Port))
Expand Down
19 changes: 14 additions & 5 deletions src/api/models/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
type State string

const (
Queued State = "Queued"
Running = "Running"
Done = "Done"
Error = "Error"
Queued State = "Queued"
Downloading = "Downloading"
Running = "Running"
Done = "Done"
Error = "Error"
)

type IngestRequest struct {
type VariantIngestRequest struct {
Id uuid.UUID `json:"id"`
Filename string `json:"filename"`
State State `json:"state"`
Expand All @@ -22,6 +23,14 @@ type IngestRequest struct {
UpdatedAt string `json:"updatedAt"`
}

type GeneIngestRequest struct {
Filename string `json:"filename"`
State State `json:"state"`
Message string `json:"message"`
CreatedAt string `json:"createdAt"`
UpdatedAt string `json:"updatedAt"`
}

type IngestResponseDTO struct {
Id uuid.UUID `json:"id"`
Filename string `json:"filename"`
Expand Down
105 changes: 86 additions & 19 deletions src/api/mvc/genes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"api/models/constants"
assemblyId "api/models/constants/assembly-id"
"api/models/constants/chromosome"
"api/models/ingest"
"api/models/ingest/structs"
esRepo "api/repositories/elasticsearch"
"bufio"
Expand All @@ -20,6 +21,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/labstack/echo"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -58,10 +60,20 @@ func GenesIngest(c echo.Context) error {

for assId, fileName := range assemblyIdMap {
assemblyWg.Add(1)
go func(_assId constants.AssemblyId, _fileName string, _assemblyWg *sync.WaitGroup) {

newRequestState := ingest.GeneIngestRequest{
Filename: fileName,
State: ingest.Queued,
CreatedAt: fmt.Sprintf("%s", time.Now()),
}

go func(_assId constants.AssemblyId, _fileName string, _assemblyWg *sync.WaitGroup, reqStat *ingest.GeneIngestRequest) {
defer _assemblyWg.Done()

var geneWg sync.WaitGroup
var (
unzippedFileName string
geneWg sync.WaitGroup
)
gtfFile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName))
if err != nil {
// log.Fatalf("failed to open file: %s", err)
Expand All @@ -80,77 +92,112 @@ func GenesIngest(c echo.Context) error {
// Create blank file
file, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, _fileName))
if err != nil {
log.Fatal(err)
msg := "Something went wrong: " + err.Error()
fmt.Println(msg)

reqStat.State = ingest.Error
reqStat.Message = msg
iz.GeneIngestRequestChan <- reqStat
}
client := http.Client{
CheckRedirect: func(r *http.Request, via []*http.Request) error {
r.URL.Opaque = r.URL.Path
return nil
},
}

fmt.Printf("Downloading file %s ...\n", _fileName)
reqStat.State = ingest.Downloading
iz.GeneIngestRequestChan <- reqStat

// Put content on file
resp, err := client.Get(fullURLFile)
if err != nil {
log.Fatal(err)
msg := "Something went wrong: " + err.Error()
fmt.Println(msg)

reqStat.State = ingest.Error
reqStat.Message = msg
iz.GeneIngestRequestChan <- reqStat
}
defer resp.Body.Close()

size, err := io.Copy(file, resp.Body)
if err != nil {
log.Fatal(err)
msg := "Something went wrong: " + err.Error()
fmt.Println(msg)

reqStat.State = ingest.Error
reqStat.Message = msg
iz.GeneIngestRequestChan <- reqStat
}
defer file.Close()

fmt.Printf("Downloaded a file %s with size %d\n", _fileName, size)

fmt.Printf("Unzipping %s...\n", _fileName)
gzipfile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName))
unzippedFile, err := os.Open(fmt.Sprintf("%s/%s", gtfPath, _fileName))
if err != nil {
fmt.Println(err)
os.Exit(1)
}

reader, err := gzip.NewReader(gzipfile)
reader, err := gzip.NewReader(unzippedFile)
if err != nil {
fmt.Println(err)
os.Exit(1)
msg := "Something went wrong: " + err.Error()
fmt.Println(msg)

reqStat.State = ingest.Error
reqStat.Message = msg
iz.GeneIngestRequestChan <- reqStat
}
defer reader.Close()

newfilename := strings.TrimSuffix(_fileName, ".gz")
unzippedFileName = strings.TrimSuffix(_fileName, ".gz")

writer, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, newfilename))
writer, err := os.Create(fmt.Sprintf("%s/%s", gtfPath, unzippedFileName))

if err != nil {
fmt.Println(err)
os.Exit(1)
msg := "Something went wrong: " + err.Error()
fmt.Println(msg)

reqStat.State = ingest.Error
reqStat.Message = msg
iz.GeneIngestRequestChan <- reqStat
}

defer writer.Close()

if _, err = io.Copy(writer, reader); err != nil {
fmt.Println(err)
os.Exit(1)
msg := "Something went wrong: " + err.Error()
fmt.Println(msg)

reqStat.State = ingest.Error
reqStat.Message = msg
iz.GeneIngestRequestChan <- reqStat
}

fmt.Printf("Opening %s\n", newfilename)
gtfFile, _ = os.Open(fmt.Sprintf("%s/%s", gtfPath, newfilename))
fmt.Printf("Opening %s\n", unzippedFileName)
gtfFile, _ = os.Open(fmt.Sprintf("%s/%s", gtfPath, unzippedFileName))

fmt.Printf("Deleting %s\n", _fileName)
err = os.Remove(fmt.Sprintf("%s/%s", gtfPath, _fileName))
if err != nil {
fmt.Println(err)
}
} else {
// for the rare occurences where the file wasn't deleted
// after ingestion (i.e. some kind of interruption), this ensures it does
unzippedFileName = _fileName
}

defer gtfFile.Close()

fileScanner := bufio.NewScanner(gtfFile)
fileScanner.Split(bufio.ScanLines)

fmt.Printf("Ingesting %s\n", string(_assId))
reqStat.State = ingest.Running
iz.GeneIngestRequestChan <- reqStat

var (
chromHeaderKey = 0
Expand Down Expand Up @@ -249,7 +296,16 @@ func GenesIngest(c echo.Context) error {
geneWg.Wait()

fmt.Printf("%s ingestion done!\n", _assId)
}(assId, fileName, &assemblyWg)
fmt.Printf("Deleting %s\n", unzippedFileName)
err = os.Remove(fmt.Sprintf("%s/%s", gtfPath, unzippedFileName))
if err != nil {
fmt.Println(err)
}

reqStat.State = ingest.Done
iz.GeneIngestRequestChan <- reqStat

}(assId, fileName, &assemblyWg, &newRequestState)
}

assemblyWg.Wait()
Expand All @@ -258,6 +314,17 @@ func GenesIngest(c echo.Context) error {
return c.JSON(http.StatusOK, "{\"message\":\"please check in with /genes/overview !\"}")
}

func GetAllGeneIngestionRequests(c echo.Context) error {
izMap := c.(*contexts.GohanContext).IngestionService.GeneIngestRequestMap

// transform map of it-to-ingestRequests to an array
m := make([]*ingest.GeneIngestRequest, 0, len(izMap))
for _, val := range izMap {
m = append(m, val)
}
return c.JSON(http.StatusOK, m)
}

func GenesGetByNomenclatureWildcard(c echo.Context) error {
cfg := c.(*contexts.GohanContext).Config
es := c.(*contexts.GohanContext).Es7Client
Expand Down
6 changes: 3 additions & 3 deletions src/api/mvc/variants.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func VariantsIngest(c echo.Context) error {
}

// if not, execute
newRequestState := &ingest.IngestRequest{
newRequestState := &ingest.VariantIngestRequest{
Id: uuid.New(),
Filename: fileName,
State: ingest.Queued,
Expand All @@ -171,7 +171,7 @@ func VariantsIngest(c echo.Context) error {
Message: "Successfully queued..",
})

go func(file string, reqStat *ingest.IngestRequest) {
go func(file string, reqStat *ingest.VariantIngestRequest) {

reqStat.State = ingest.Running
ingestionService.IngestRequestChan <- reqStat
Expand Down Expand Up @@ -304,7 +304,7 @@ func GetAllVariantIngestionRequests(c echo.Context) error {
izMap := c.(*contexts.GohanContext).IngestionService.IngestRequestMap

// transform map of it-to-ingestRequests to an array
m := make([]*ingest.IngestRequest, 0, len(izMap))
m := make([]*ingest.VariantIngestRequest, 0, len(izMap))
for _, val := range izMap {
m = append(m, val)
}
Expand Down
26 changes: 22 additions & 4 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ import (
type (
IngestionService struct {
Initialized bool
IngestRequestChan chan *ingest.IngestRequest
IngestRequestMap map[string]*ingest.IngestRequest
IngestRequestChan chan *ingest.VariantIngestRequest
IngestRequestMap map[string]*ingest.VariantIngestRequest
GeneIngestRequestChan chan *ingest.GeneIngestRequest
GeneIngestRequestMap map[string]*ingest.GeneIngestRequest
IngestionBulkIndexingCapacity int
ElasticsearchClient *elasticsearch.Client
IngestionBulkIndexingQueue chan *structs.IngestionQueueStructure
Expand All @@ -53,8 +55,10 @@ func NewIngestionService(es *elasticsearch.Client) *IngestionService {

iz := &IngestionService{
Initialized: false,
IngestRequestChan: make(chan *ingest.IngestRequest),
IngestRequestMap: map[string]*ingest.IngestRequest{},
IngestRequestChan: make(chan *ingest.VariantIngestRequest),
IngestRequestMap: map[string]*ingest.VariantIngestRequest{},
GeneIngestRequestChan: make(chan *ingest.GeneIngestRequest),
GeneIngestRequestMap: map[string]*ingest.GeneIngestRequest{},
IngestionBulkIndexingCapacity: defaultBulkIndexingCap,
IngestionBulkIndexingQueue: make(chan *structs.IngestionQueueStructure, defaultBulkIndexingCap),
GeneIngestionBulkIndexingQueue: make(chan *structs.GeneIngestionQueueStructure, 10),
Expand Down Expand Up @@ -107,6 +111,20 @@ func (i *IngestionService) Init() {
}
}()

go func() {
for {
select {
case newRequest := <-i.GeneIngestRequestChan:
if newRequest.State == ingest.Queued {
fmt.Printf("Received new request for %s", newRequest.Filename)
}

newRequest.UpdatedAt = fmt.Sprintf("%s", time.Now())
i.GeneIngestRequestMap[newRequest.Filename] = newRequest
}
}
}()

// spin up a listener for each bulk indexing
go func() {
for {
Expand Down

0 comments on commit 10ec719

Please sign in to comment.