Skip to content

Commit

Permalink
Fix #396 Refactor fetch all data agreement records to use updated pip…
Browse files Browse the repository at this point in the history
…eline
  • Loading branch information
albinpa authored and georgepadayatti committed Oct 27, 2023
1 parent 991e329 commit 5b81f98
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 68 deletions.
119 changes: 63 additions & 56 deletions src/v2/dataagreement_record/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,38 @@ func (darRepo *DataAgreementRecordRepository) CountDataAgreementRecords(dataAgre
return count, nil
}

// PipelineForList creates pipeline for list data agreement records
func PipelineForList(organisationId string, id string, lawfulBasis string, isId bool, isLawfulBasis bool) ([]primitive.M, error) {
var pipeline []primitive.M

var pipelineForIdExists []primitive.M
if isId {
dataAgreementRecordId, err := primitive.ObjectIDFromHex(id)
if err != nil {
return []bson.M{}, err
// CreatePipelineForFilteringDataAgreementRecords This pipeline is used for filtering data agreement records by `id` and `lawfulBasis`
// `id` has 3 possible values - dataAgreementRecordId, dataAgreementId, individualId
func CreatePipelineForFilteringDataAgreementRecords(organisationId string, id string, lawfulBasis string) ([]primitive.M, error) {

var pipeline []bson.M

// Stage 1 - Match by `organisationId` and `isDeleted=false`
pipeline = append(pipeline, bson.M{"$match": bson.M{"organisationid": organisationId, "isdeleted": false}})

if len(id) > 0 {

or := []bson.M{
{"dataagreementid": id},
{"individualid": id},
}

pipelineForIdExists = []bson.M{
{"$match": bson.M{
"$or": []bson.M{
{"_id": dataAgreementRecordId},
{"dataagreementid": id},
{"individualid": id},
},
},
},
// Stage 2 - Match `id` against `dataAgreementRecordId`, `dataAgreementId`, `individualId`
convertIdtoObjectId, err := primitive.ObjectIDFromHex(id)
if err == nil {
// Append `dataAgreementRecordId` `or` statements only if
// string is converted to objectId without errors
or = append(or, bson.M{"_id": convertIdtoObjectId})
}

pipeline = append(pipeline, bson.M{"$match": bson.M{
"$or": or,
}})
}
lookupAgreementStage := bson.M{"$lookup": bson.M{

// Stage 3 - Lookup data agreement document by `dataAgreementId`
// This is done to obtain `policy` and `lawfulBasis` fields from data agreement document
pipeline = append(pipeline, bson.M{"$lookup": bson.M{
"from": "dataAgreements",
"let": bson.M{"localId": "$dataagreementid"},
"pipeline": bson.A{
Expand All @@ -137,9 +146,14 @@ func PipelineForList(organisationId string, id string, lawfulBasis string, isId
},
},
"as": "dataAgreements",
}}
unwindStage := bson.M{"$unwind": "$dataAgreements"}
lookupRevisionStage := bson.M{"$lookup": bson.M{
}})

// Stage 4 - Unwind the data agreement fields
pipeline = append(pipeline, bson.M{"$unwind": "$dataAgreements"})

// Stage 5 - Lookup revision by `dataAgreementRecordId`
// This is done to obtain timestamp for the latest revision of the data agreement record.
pipeline = append(pipeline, bson.M{"$lookup": bson.M{
"from": "revisions",
"let": bson.M{"localId": "$_id"},
"pipeline": bson.A{
Expand All @@ -150,47 +164,40 @@ func PipelineForList(organisationId string, id string, lawfulBasis string, isId
},
},
},
bson.M{
"$sort": bson.M{"timestamp": -1},
},
bson.M{"$limit": int64(1)},
},
"as": "Revisions",
}}
addRevisionFieldStage := bson.M{
"$addFields": bson.M{
"Revision": bson.M{
"$arrayElemAt": []interface{}{
bson.M{
"$slice": []interface{}{"$Revisions", -1},
},
0,
"as": "revisions",
}})

// Stage 6 - Add the timestamp from revisions
pipeline = append(pipeline, bson.M{"$addFields": bson.M{"timestamp": bson.M{
"$let": bson.M{
"vars": bson.M{
"first": bson.M{
"$arrayElemAt": bson.A{"$revisions", 0},
},
},
"in": "$$first.timestamp",
},
}
addTimestampFieldStage := bson.M{"$addFields": bson.M{"timestamp": "$Revision.timestamp"}}
projectStage := bson.M{
}}})

// Stage 7 - Remove revisions field
pipeline = append(pipeline, bson.M{
"$project": bson.M{
"Revisions": 0,
"Revision": 0,
"revisions": 0,
},
}
})

pipelineForIdNotExists := []bson.M{
{"$match": bson.M{"organisationid": organisationId, "isdeleted": false}},
}

lawfulBasisMatch := bson.M{
"$match": bson.M{
"dataAgreements.lawfulbasis": lawfulBasis,
},
}
if isId && isLawfulBasis {
pipeline = append(pipelineForIdExists, lookupAgreementStage, unwindStage, lookupRevisionStage, addRevisionFieldStage, addTimestampFieldStage, projectStage, lawfulBasisMatch)
} else if isId && !isLawfulBasis {
pipeline = append(pipelineForIdExists, lookupAgreementStage, unwindStage, lookupRevisionStage, addRevisionFieldStage, addTimestampFieldStage, projectStage)

} else if isLawfulBasis && !isId {
pipeline = append(pipelineForIdNotExists, lookupAgreementStage, unwindStage, lookupRevisionStage, addRevisionFieldStage, addTimestampFieldStage, projectStage, lawfulBasisMatch)
} else {
pipeline = []bson.M{}
// Stage 8 - Match by lawful basis
if len(lawfulBasis) > 0 {
pipeline = append(pipeline, bson.M{
"$match": bson.M{
"dataAgreements.lawfulbasis": lawfulBasis,
},
})
}

return pipeline, nil
Expand Down
12 changes: 3 additions & 9 deletions src/v2/handler/audit/audit_list_dataagreement_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,17 @@ func AuditListDataAgreementRecords(w http.ResponseWriter, r *http.Request) {
darRepo := daRecord.DataAgreementRecordRepository{}
darRepo.Init(organisationId)

var isIdExists bool
var isLawfulBasis bool
id, err := daRecord.ParseQueryParams(r, config.Id, daRecord.IdIsMissingError)
if err != nil && errors.Is(err, daRecord.IdIsMissingError) {
isIdExists = false
} else {
isIdExists = true
log.Println(err)
}

lawfulBasis, err := daRecord.ParseQueryParams(r, config.LawfulBasis, daRecord.LawfulBasisIsMissingError)
if err != nil && errors.Is(err, daRecord.LawfulBasisIsMissingError) {
isLawfulBasis = false
} else {
isLawfulBasis = true
log.Println(err)
}

pipeline, err := daRecord.PipelineForList(organisationId, id, lawfulBasis, isIdExists, isLawfulBasis)
pipeline, err := daRecord.CreatePipelineForFilteringDataAgreementRecords(organisationId, id, lawfulBasis)
if err != nil {
m := "Failed to create pipeline"
common.HandleErrorV2(w, http.StatusInternalServerError, m, err)
Expand Down
20 changes: 17 additions & 3 deletions src/v2/paginate/paginate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package paginate
import (
"context"
"errors"
"log"
"math"
"net/http"
"reflect"
Expand Down Expand Up @@ -229,6 +230,20 @@ func PaginateDBObjectsUsingPipeline(query PaginateDBObjectsQueryUsingPipeline, r

totalItems := len(dbObjectsForCount)

if totalItems == 0 {
return &PaginatedDBResult{
Items: []string{},
Pagination: Pagination{
CurrentPage: 1,
TotalItems: 0,
Limit: query.Limit,
TotalPages: 1,
HasPrevious: false,
HasNext: false,
},
}, nil
}

// Ensure offset is not negative and limit is positive
if query.Offset < 0 {
query.Offset = 0
Expand All @@ -237,13 +252,12 @@ func PaginateDBObjectsUsingPipeline(query PaginateDBObjectsQueryUsingPipeline, r
query.Limit = 1
}

log.Printf("Current totalItems: %d", totalItems)

// Ensure offset is within bounds
if query.Offset >= int(totalItems) {
query.Offset = int(totalItems) - query.Limit
}
if query.Offset < 0 {
query.Offset = 0
}

// Calculate pages and selected page based on offset and limit
totalPages := int(math.Ceil(float64(totalItems) / float64(query.Limit)))
Expand Down

0 comments on commit 5b81f98

Please sign in to comment.