Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Features/retrieve_last_ingested #55

Merged
merged 22 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func main() {

// -- Data-Types
e.GET("/data-types", dataTypesMvc.GetDataTypes)
e.GET("/public/data-types", dataTypesMvc.GetReducedDataTypes)
e.GET("/data-types/variant", dataTypesMvc.GetVariantDataType)
e.GET("/data-types/variant/schema", dataTypesMvc.GetVariantDataTypeSchema)
e.GET("/data-types/variant/metadata_schema", dataTypesMvc.GetVariantDataTypeMetadataSchema)
Expand Down
39 changes: 1 addition & 38 deletions src/api/mvc/data-types/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@ func fetchVariantData(c echo.Context) (map[string]interface{}, error) {
if err != nil {
return nil, err
}

variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"])
if latestCreated, ok := resultsMap["last_created_time"].(string); ok {
variantDataTypeJson["last_ingested"] = latestCreated
}
variantDataTypeJson["last_ingested"] = resultsMap["last_ingested"]

return variantDataTypeJson, nil
}
Expand All @@ -50,40 +47,6 @@ func GetDataTypes(c echo.Context) error {
})
}

func GetReducedDataTypes(c echo.Context) error {
variantData, err := fetchVariantData(c)
if err != nil {
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": err.Error(),
})
}

if variantData == nil {
return c.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": "Failed to retrieve variant data.",
})
}

count, _ := variantData["count"]
id, _ := variantData["id"].(string)
label, _ := variantData["label"].(string)
last_ingested, _ := variantData["last_ingested"].(string)
queryable, _ := variantData["queryable"].(bool)

// Create a reduced response
reducedResponse := map[string]interface{}{
"count": count,
"id": id,
"label": label,
"last_ingested": last_ingested,
"queryable": queryable,
}

return c.JSON(http.StatusOK, []map[string]interface{}{
reducedResponse,
})
}

func GetVariantDataType(c echo.Context) error {
return c.JSON(http.StatusOK, variantDataTypeJson)
}
Expand Down
10 changes: 5 additions & 5 deletions src/api/repositories/elasticsearch/variants.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,14 @@ func GetMostRecentVariantTimestamp(cfg *models.Config, es *elasticsearch.Client,
mostRecentTimestamp = parsedTime
} else {
fmt.Printf("Error parsing 'createdTime' timestamp: %s\n", err)
v-rocheleau marked this conversation as resolved.
Show resolved Hide resolved
return mostRecentTimestamp, err
}
}
}
}
} else {
fmt.Println("No hits found for dataset:", dataset)
fmt.Printf("No hits found for dataset: %s\n", dataset)
return mostRecentTimestamp, nil
}
}

Expand Down Expand Up @@ -602,10 +604,9 @@ func GetVariantsBucketsByKeyword(cfg *models.Config, es *elasticsearch.Client, k
},
},
},
"latest_created": map[string]interface{}{
"last_ingested": map[string]interface{}{
"max": map[string]interface{}{
"field": "createdTime",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
"field": "createdTime",
},
},
},
Expand Down Expand Up @@ -752,7 +753,6 @@ func GetVariantsBucketsByKeywordAndDataset(cfg *models.Config, es *elasticsearch
}

func DeleteVariantsByDatasetId(cfg *models.Config, es *elasticsearch.Client, dataset string) (map[string]interface{}, error) {

var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
Expand Down
56 changes: 39 additions & 17 deletions src/api/services/variants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"gohan/api/models"
esRepo "gohan/api/repositories/elasticsearch"
"sync"
"time"

"github.com/elastic/go-elasticsearch/v7"
)
Expand All @@ -29,6 +30,32 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri
resultsMux := sync.RWMutex{}

var wg sync.WaitGroup

callProcessLatestCreated := func(key string, keyword string, _wg *sync.WaitGroup) {
noctillion marked this conversation as resolved.
Show resolved Hide resolved
defer _wg.Done()

results, err := esRepo.GetVariantsBucketsByKeyword(cfg, es, keyword)
if err != nil {
resultsMux.Lock()
resultsMap[key+"_error"] = "Failed to get latest created time."
resultsMux.Unlock()
return
}

var formattedTime string
if aggs, ok := results["aggregations"].(map[string]interface{}); ok {
if latest, ok := aggs["last_ingested"].(map[string]interface{}); ok {
if timestamp, ok := latest["value"].(float64); ok {
formattedTime = time.UnixMilli(int64(timestamp)).UTC().Format(time.RFC3339)
}
}
}

resultsMux.Lock()
resultsMap[key] = formattedTime
resultsMux.Unlock()
}

callGetBucketsByKeyword := func(key string, keyword string, _wg *sync.WaitGroup) {
defer _wg.Done()

Expand All @@ -45,17 +72,14 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri

// retrieve aggregations.items.buckets
bucketsMapped := []interface{}{}
if aggs, aggsOk := results["aggregations"].(map[string]interface{}); aggsOk {
if latest, latestOk := aggs["latest_created"].(map[string]interface{}); latestOk {
if valueAsString, valOk := latest["value_as_string"].(string); valOk {
resultsMux.Lock()
resultsMap["last_created_time"] = valueAsString
resultsMux.Unlock()
}
}
if items, itemsOk := aggs["items"].(map[string]interface{}); itemsOk {
if buckets, bucketsOk := items["buckets"].([]interface{}); bucketsOk {
bucketsMapped = buckets
if aggs, aggsOk := results["aggregations"]; aggsOk {
aggsMapped := aggs.(map[string]interface{})

if items, itemsOk := aggsMapped["items"]; itemsOk {
itemsMapped := items.(map[string]interface{})

if buckets, bucketsOk := itemsMapped["buckets"]; bucketsOk {
bucketsMapped = buckets.([]interface{})
}
}
}
Expand All @@ -80,12 +104,6 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri
return nil, errors.New("could not contact Elasticsearch - make sure it's running")
}

// Extract latest created time
if latest, exists := resultsMap["last_ingested"].(map[string]interface{}); exists {
latestCreatedTime := latest["value_as_string"].(string)
resultsMap["last_created_time"] = latestCreatedTime
}

// get distribution of chromosomes
wg.Add(1)
go callGetBucketsByKeyword("chromosomes", "chrom.keyword", &wg)
Expand All @@ -106,6 +124,10 @@ func GetVariantsOverview(es *elasticsearch.Client, cfg *models.Config) (map[stri
wg.Add(1)
go callGetBucketsByKeyword("datasets", "dataset.keyword", &wg)

// get last ingested variant
wg.Add(1)
go callProcessLatestCreated("last_ingested", "last_ingested.keyword", &wg)

wg.Wait()

return resultsMap, nil
Expand Down
17 changes: 13 additions & 4 deletions src/api/tests/build/api/variants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,22 @@ func TestDemoVcfIngestion(t *testing.T) {
for oK, oV := range overviewJson {
assert.NotNil(t, oV)

assert.NotNil(t, overviewJson[oK])
assert.NotNil(t, overviewJson[oK].(map[string]interface{}))
// handle 'last_ingested' as a string
if oK == "last_ingested" {
_, ok := oV.(string)
assert.True(t, ok)
continue
}

// assert the value is a map for other keys
mapValue, ok := oV.(map[string]interface{})
assert.True(t, ok)

for k, v := range oV.(map[string]interface{}) {
for k, v := range mapValue {
key := k
assert.NotNil(t, v)
value := v.(float64)
value, ok := v.(float64)
assert.True(t, ok)
assert.NotNil(t, key)
assert.NotEmpty(t, key)
assert.NotEmpty(t, value)
Expand Down
4 changes: 4 additions & 0 deletions src/api/tests/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func GetVariantsOverview(_t *testing.T, _cfg *models.Config) map[string]interfac
assert.True(_t, sidkOk)
assert.NotNil(_t, sampleIDsKey)

lastIngestionKey, likOk := overviewRespJson["last_ingested"]
assert.True(_t, likOk)
assert.NotNil(_t, lastIngestionKey)

return overviewRespJson
}

Expand Down