Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/retrieve_last_ingested #55

Merged
merged 22 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/api/models/indexes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexes

import (
c "gohan/api/models/constants"
"time"
)

type Variant struct {
Expand All @@ -17,9 +18,10 @@ type Variant struct {

Sample Sample `json:"sample"`

FileId string `json:"fileId"`
Dataset string `json:"dataset"`
AssemblyId c.AssemblyId `json:"assemblyId"`
FileId string `json:"fileId"`
Dataset string `json:"dataset"`
AssemblyId c.AssemblyId `json:"assemblyId"`
CreatedTime time.Time `json:"createdTime"`
}

type Info struct {
Expand Down
25 changes: 15 additions & 10 deletions src/api/mvc/data-types/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,31 @@ var variantDataTypeJson = map[string]interface{}{
"metadata_schema": schemas.OBJECT_SCHEMA,
}

func GetDataTypes(c echo.Context) error {
es := c.(*contexts.GohanContext).Es7Client
cfg := c.(*contexts.GohanContext).Config
func fetchVariantData(c echo.Context) (map[string]interface{}, error) {
gc := c.(*contexts.GohanContext)
cfg := gc.Config
es := gc.Es7Client

// accumulate number of variants associated with each
// sampleId fetched from the variants overview
resultsMap, err := variantService.GetVariantsOverview(es, cfg)
if err != nil {
return nil, err
}
variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"])
variantDataTypeJson["last_ingested"] = resultsMap["last_ingested"]

return variantDataTypeJson, nil
}

func GetDataTypes(c echo.Context) error {
variantData, err := fetchVariantData(c)
if err != nil {
// Could not talk to Elasticsearch, return an error
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": err.Error(),
})
}

variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"])

// Data types are basically stand-ins for schema blocks
return c.JSON(http.StatusOK, []map[string]interface{}{
variantDataTypeJson,
variantData,
})
}

Expand Down
57 changes: 47 additions & 10 deletions src/api/mvc/variants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,40 @@ func GetDatasetVariantsCount(c echo.Context) int {
return int(totalVariantsCount)
}

func GetLastCreatedVariantForDataset(c echo.Context) string {
gc := c.(*contexts.GohanContext)
cfg := gc.Config
es := gc.Es7Client

dataset := gc.Dataset
fmt.Printf("Fetching the last 'created' timestamp for dataset: %s\n", dataset)

var (
lastCreatedTimestamp string
g = new(errgroup.Group)
)

g.Go(func() error {
timestamp, timestampError := esRepo.GetMostRecentVariantTimestamp(cfg, es, dataset.String())
if timestampError != nil {
fmt.Printf("Failed to fetch the most recent 'created' timestamp for dataset %s. Error: %v\n", dataset, timestampError)
return timestampError
}

lastCreatedTimestamp = timestamp.Format(time.RFC3339)
fmt.Printf("Fetched timestamp for dataset %s is: %s\n", dataset, lastCreatedTimestamp)
return nil
})

// wait for the HTTP fetch to complete.
if err := g.Wait(); err != nil {
fmt.Printf("Encountered an error while fetching data: %v\n", err)
} else {
fmt.Printf("Successfully Obtained Dataset '%s' most recent 'created' timestamp: '%s' \n", dataset, lastCreatedTimestamp)
}
return lastCreatedTimestamp
}

func GetDatasetSummary(c echo.Context) error {

gc := c.(*contexts.GohanContext)
Expand Down Expand Up @@ -600,24 +634,27 @@ func ClearDataset(c echo.Context) error {
}

type DataTypeSummary struct {
Id string `json:"id"`
Label string `json:"label"`
Queryable bool `json:"queryable"`
Schema map[string]interface{} `json:"schema"`
Count int `json:"count"`
Id string `json:"id"`
Label string `json:"label"`
Queryable bool `json:"queryable"`
Schema map[string]interface{} `json:"schema"`
Count int `json:"count"`
LastCreated string `json:"last_ingested"`
}

type DataTypeResponseDto = []DataTypeSummary

func GetDatasetDataTypes(c echo.Context) error {
count := GetDatasetVariantsCount(c)
last_ingested := GetLastCreatedVariantForDataset(c)
return c.JSON(http.StatusOK, &DataTypeResponseDto{
DataTypeSummary{
Id: "variant",
Label: "Variants",
Queryable: true,
Schema: schemas.VARIANT_SCHEMA,
Count: count,
Id: "variant",
Label: "Variants",
Queryable: true,
Schema: schemas.VARIANT_SCHEMA,
Count: count,
LastCreated: last_ingested,
},
})
}
Expand Down
85 changes: 84 additions & 1 deletion src/api/repositories/elasticsearch/variants.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,85 @@ func GetDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, e
return result, nil
}

func GetMostRecentVariantTimestamp(cfg *models.Config, es *elasticsearch.Client, dataset string) (time.Time, error) {
// Initialize a zero-value timestamp
var mostRecentTimestamp time.Time

// Setup the Elasticsearch query to fetch the most recent 'created' timestamp
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
"term": map[string]string{
"dataset.keyword": dataset,
},
},
"size": 1,
"sort": []map[string]interface{}{
{
"createdTime": map[string]string{
"order": "desc",
},
},
},
}

// Encode the query to JSON
if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Fatalf("Error encoding query: %s\n", err)
return mostRecentTimestamp, err
}

// Print the constructed query for debugging
fmt.Println("Constructed Elasticsearch Query:", string(buf.Bytes()))

// Execute the query against Elasticsearch
res, searchErr := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex(wildcardVariantsIndex),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)

if searchErr != nil {
fmt.Printf("Error executing search request: %s\n", searchErr)
return mostRecentTimestamp, searchErr
}
defer res.Body.Close()

// Parse the response
var result map[string]interface{}
decoder := json.NewDecoder(res.Body)
if err := decoder.Decode(&result); err != nil {
fmt.Printf("Error unmarshalling Elasticsearch response: %s\n", err)
return mostRecentTimestamp, err
}

// Extract the 'created' timestamp from the first hit (if available)
if hits, found := result["hits"].(map[string]interface{}); found {
if hitSlice, hitFound := hits["hits"].([]interface{}); hitFound && len(hitSlice) > 0 {
if firstHit, firstHitFound := hitSlice[0].(map[string]interface{}); firstHitFound {
if source, sourceFound := firstHit["_source"].(map[string]interface{}); sourceFound {
if created, createdFound := source["createdTime"].(string); createdFound {
parsedTime, err := time.Parse(time.RFC3339, created)
if err == nil {
mostRecentTimestamp = parsedTime
} else {
fmt.Printf("Error parsing 'createdTime' timestamp: %s\n", err)
v-rocheleau marked this conversation as resolved.
Show resolved Hide resolved
return mostRecentTimestamp, err
}
}
}
}
} else {
fmt.Printf("No hits found for dataset: %s\n", dataset)
return mostRecentTimestamp, nil
}
}

return mostRecentTimestamp, nil
}

func CountDocumentsContainerVariantOrSampleIdInPositionRange(cfg *models.Config, es *elasticsearch.Client,
chromosome string, lowerBound int, upperBound int,
variantId string, sampleId string, datasetString string,
Expand Down Expand Up @@ -525,6 +604,11 @@ func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, k
},
},
},
"last_ingested": map[string]interface{}{
"max": map[string]interface{}{
"field": "createdTime",
},
},
},
}

Expand Down Expand Up @@ -669,7 +753,6 @@ func GetVariantsBucketsByKeywordAndDataset(cfg *models.Config, es *elasticsearch
}

func DeleteVariantsByDatasetId(cfg *models.Config, es *elasticsearch.Client, dataset string) (map[string]interface{}, error) {

var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
Expand Down
2 changes: 2 additions & 0 deletions src/api/services/ingestion.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,8 @@ func (i *IngestionService) ProcessVcf(
var resultingVariant indexes.Variant
mapstructure.Decode(tmpVariant, &resultingVariant)

resultingVariant.CreatedTime = time.Now()

// pass variant (along with a waitgroup) to the channel
i.IngestionBulkIndexingQueue <- &structs.IngestionQueueStructure{
Variant: &resultingVariant,
Expand Down
31 changes: 31 additions & 0 deletions src/api/services/variants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"gohan/api/models"
esRepo "gohan/api/repositories/elasticsearch"
"sync"
"time"

"github.com/elastic/go-elasticsearch/v7"
)
Expand All @@ -29,6 +30,32 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri
resultsMux := sync.RWMutex{}

var wg sync.WaitGroup

callProcessLatestCreated := func(key string, keyword string, _wg *sync.WaitGroup) {
noctillion marked this conversation as resolved.
Show resolved Hide resolved
defer _wg.Done()

results, err := esRepo.GetVariantsBucketsByKeyword(cfg, es, keyword)
if err != nil {
resultsMux.Lock()
resultsMap[key+"_error"] = "Failed to get latest created time."
resultsMux.Unlock()
return
}

var formattedTime string
if aggs, ok := results["aggregations"].(map[string]interface{}); ok {
if latest, ok := aggs["last_ingested"].(map[string]interface{}); ok {
if timestamp, ok := latest["value"].(float64); ok {
formattedTime = time.UnixMilli(int64(timestamp)).UTC().Format(time.RFC3339)
}
}
}

resultsMux.Lock()
resultsMap[key] = formattedTime
resultsMux.Unlock()
}

callGetBucketsByKeyword := func(key string, keyword string, _wg *sync.WaitGroup) {
defer _wg.Done()

Expand Down Expand Up @@ -97,6 +124,10 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri
wg.Add(1)
go callGetBucketsByKeyword("datasets", "dataset.keyword", &wg)

// get last ingested variant
wg.Add(1)
go callProcessLatestCreated("last_ingested", "last_ingested.keyword", &wg)

wg.Wait()

return resultsMap, nil
Expand Down
17 changes: 13 additions & 4 deletions src/api/tests/build/api/variants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,22 @@ func TestDemoVcfIngestion(t *testing.T) {
for oK, oV := range overviewJson {
assert.NotNil(t, oV)

assert.NotNil(t, overviewJson[oK])
assert.NotNil(t, overviewJson[oK].(map[string]interface{}))
// handle 'last_ingested' as a string
if oK == "last_ingested" {
_, ok := oV.(string)
assert.True(t, ok)
continue
}

// assert the value is a map for other keys
mapValue, ok := oV.(map[string]interface{})
assert.True(t, ok)

for k, v := range oV.(map[string]interface{}) {
for k, v := range mapValue {
key := k
assert.NotNil(t, v)
value := v.(float64)
value, ok := v.(float64)
assert.True(t, ok)
assert.NotNil(t, key)
assert.NotEmpty(t, key)
assert.NotEmpty(t, value)
Expand Down
4 changes: 4 additions & 0 deletions src/api/tests/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func GetVariantsOverview(_t *testing.T, _cfg *models.Config) map[string]interfac
assert.True(_t, sidkOk)
assert.NotNil(_t, sampleIDsKey)

lastIngestionKey, likOk := overviewRespJson["last_ingested"]
assert.True(_t, likOk)
assert.NotNil(_t, lastIngestionKey)

return overviewRespJson
}

Expand Down