diff --git a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go index c77d5642c..88a71e55d 100644 --- a/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go +++ b/migrations/20231128_jellyfish_migration/20231128_jellyfish_migration.go @@ -14,8 +14,6 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - - "github.com/tidepool-org/platform/migrations/20231128_jellyfish_migration/utils" ) type Config struct { @@ -390,6 +388,8 @@ func (m *Migration) fetchAndUpdateBatch() bool { Sort: bson.M{"_id": 1}, }, ) + //dDataCursor.SetBatchSize(1000) + if err != nil { log.Printf("failed to select data: %s", err) return false @@ -399,33 +399,30 @@ func (m *Migration) fetchAndUpdateBatch() bool { log.Printf("fetch took %s", time.Since(fetchStart)) updateStart := time.Now() - var totalDuration time.Duration for dDataCursor.Next(m.ctx) { - start := time.Now() - var dDataResult bson.M - if err = dDataCursor.Decode(&dDataResult); err != nil { - log.Printf("failed decoding data: %s", err) - return false - } - log.Printf("cursor decode %s", time.Since(start)) - datumID, datumUpdates, err := utils.GetDatumUpdates(dDataResult) - if err != nil { - m.onError(err, datumID, "failed getting updates") - continue - } - log.Printf("datum updates %s", time.Since(start)) - updateOp := mongo.NewUpdateOneModel() - updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": dDataResult["modifiedTime"]}) - updateOp.SetUpdate(datumUpdates) - m.updates = append(m.updates, updateOp) - m.lastUpdatedId = datumID - log.Printf("added to updates %s", time.Since(start)) - totalDuration += time.Since(start) + // start := time.Now() + // var dDataResult bson.M + // if err = dDataCursor.Decode(&dDataResult); err != nil { + // log.Printf("failed decoding data: %s", err) + // return false + // } + // log.Printf("cursor decode %s", time.Since(start)) + // datumID, datumUpdates, err := utils.GetDatumUpdates(dDataResult) + // if err != nil { + // m.onError(err, datumID, "failed getting updates") + // continue + // } + // log.Printf("datum updates %s", time.Since(start)) + // updateOp := mongo.NewUpdateOneModel() + // updateOp.SetFilter(bson.M{"_id": datumID, "modifiedTime": dDataResult["modifiedTime"]}) + // updateOp.SetUpdate(datumUpdates) + // m.updates = append(m.updates, updateOp) + // m.lastUpdatedId = datumID + // log.Printf("added to updates %s", time.Since(start)) } - log.Printf("all datum %s", totalDuration) - log.Printf("batch update took %s", time.Since(updateStart)) + log.Printf("batch iteration took %s", time.Since(updateStart)) log.Printf("fetch and update took %s", time.Since(fetchAndUpdateStart)) return len(m.updates) > 0 } @@ -433,6 +430,9 @@ func (m *Migration) fetchAndUpdateBatch() bool { } func (m *Migration) writeBatchUpdates() (int, error) { + if len(m.updates) == 0 { + return 0, nil + } start := time.Now() var getBatches = func(chunkSize int) [][]mongo.WriteModel { batches := [][]mongo.WriteModel{} diff --git a/migrations/20231128_jellyfish_migration/utils/utils.go b/migrations/20231128_jellyfish_migration/utils/utils.go index 9dc99fc62..371adce1f 100644 --- a/migrations/20231128_jellyfish_migration/utils/utils.go +++ b/migrations/20231128_jellyfish_migration/utils/utils.go @@ -83,72 +83,76 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { var rename bson.M var identityFields []string - var errorHandler = func(id string, err error) (string, bson.M, error) { - return id, nil, err - } - datumID, ok := bsonData["_id"].(string) if !ok { - return errorHandler("", errors.New("cannot get the datum id")) + return "", nil, errors.New("cannot get the datum id") } datumType, ok := bsonData["type"].(string) if !ok { - return errorHandler(datumID, errors.New("cannot get the datum type")) - } - - //log.Printf("updates bsonData marshal start %s", time.Since(start)) - dataBytes, err := bson.Marshal(bsonData) - if err != nil { - return errorHandler(datumID, err) + return datumID, nil, errors.New("cannot get the datum type") } - //log.Printf("updates bsonData marshal end %s", time.Since(start)) - switch datumType { case basal.Type: //log.Printf("updating basal start %s", time.Since(start)) var datum *basal.Basal + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } case bolus.Type: //log.Printf("updating bolus start %s", time.Since(start)) var datum *bolus.Bolus + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } case device.Type: //log.Printf("updating device event start %s", time.Since(start)) var datum *bolus.Bolus + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } case pump.Type: //log.Printf("updating pump settings start %s", time.Since(start)) - var datum *types.Base + var datum types.Base + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } if pumpSettingsHasBolus(bsonData) { @@ -157,16 +161,20 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { sleepSchedules, err := updateIfExistsPumpSettingsSleepSchedules(bsonData) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } else if sleepSchedules != nil { set["sleepSchedules"] = sleepSchedules } case selfmonitored.Type: //log.Printf("updating smbg start %s", time.Since(start)) var datum *selfmonitored.SelfMonitored + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl { // NOTE: we need to ensure the same precision for the @@ -176,14 +184,18 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } case ketone.Type: //log.Printf("updating ketone start %s", time.Since(start)) var datum *ketone.Ketone + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl { // NOTE: we need to ensure the same precision for the @@ -193,14 +205,18 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } case continuous.Type: //log.Printf("updating cbg start %s", time.Since(start)) var datum *continuous.Continuous + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } if *datum.Units != glucose.MgdL && *datum.Units != glucose.Mgdl { // NOTE: we need to ensure the same precision for the @@ -210,18 +226,22 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } default: //log.Printf("updating generic start %s", time.Since(start)) var datum *types.Base + dataBytes, err := bson.Marshal(bsonData) + if err != nil { + return datumID, nil, err + } err = bson.Unmarshal(dataBytes, &datum) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } identityFields, err = datum.IdentityFields() if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } } @@ -229,7 +249,7 @@ func GetDatumUpdates(bsonData bson.M) (string, bson.M, error) { //log.Printf("generate hash start %s", time.Since(start)) hash, err := deduplicator.GenerateIdentityHash(identityFields) if err != nil { - return errorHandler(datumID, err) + return datumID, nil, err } //log.Printf("generate hash end %s", time.Since(start))