diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 3e96dfd18..7991a985c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -6,10 +6,7 @@ concurrency: on: push: - branches: - - master - - staging - tags: + branches: [ master, staging, sprint* ] pull_request: jobs: diff --git a/code/go/0chain.net/blobbercore/allocation/copyfilechange_test.go b/code/go/0chain.net/blobbercore/allocation/copyfilechange_test.go index ce8219059..96b6da436 100644 --- a/code/go/0chain.net/blobbercore/allocation/copyfilechange_test.go +++ b/code/go/0chain.net/blobbercore/allocation/copyfilechange_test.go @@ -375,9 +375,7 @@ func TestBlobberCore_CopyFile(t *testing.T) { config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc - ctx := context.TODO() - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx := datastore.GetStore().CreateTransaction(context.TODO()) change := &CopyFileChange{ AllocationID: tc.allocationID, diff --git a/code/go/0chain.net/blobbercore/allocation/dao.go b/code/go/0chain.net/blobbercore/allocation/dao.go index d7b734564..edeea5ed1 100644 --- a/code/go/0chain.net/blobbercore/allocation/dao.go +++ b/code/go/0chain.net/blobbercore/allocation/dao.go @@ -2,6 +2,7 @@ package allocation import ( "context" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/errors" @@ -12,33 +13,25 @@ import ( // GetOrCreate, get allocation if it exists in db. if not, try to sync it from blockchain, and insert it in db. func GetOrCreate(ctx context.Context, store datastore.Store, allocationId string) (*Allocation, error) { - db := store.GetDB() + db := store.CreateTransaction(ctx) if len(allocationId) == 0 { return nil, errors.Throw(constants.ErrInvalidParameter, "tx") } - alloc := &Allocation{} - result := db.Table(TableNameAllocation).Where(SQLWhereGetById, allocationId).First(alloc) - - if result.Error == nil { + alloc, err := Repo.GetById(db, allocationId) + if err == nil { return alloc, nil } + if !errors.Is(err, gorm.ErrRecordNotFound) { - if !errors.Is(result.Error, gorm.ErrRecordNotFound) { - - return nil, errors.ThrowLog(result.Error.Error(), common.ErrBadDataStore) + return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore) } return SyncAllocation(allocationId) } -const ( - SQLWhereGetByTx = "allocations.tx = ?" - SQLWhereGetById = "allocations.id = ?" -) - // DryRun Creates a prepared statement when executing any SQL and caches them to speed up future calls // https://gorm.io/docs/performance.html#Caches-Prepared-Statement func DryRun(db *gorm.DB) { diff --git a/code/go/0chain.net/blobbercore/allocation/deletefilechange_test.go b/code/go/0chain.net/blobbercore/allocation/deletefilechange_test.go index 5761f889c..1688e4b75 100644 --- a/code/go/0chain.net/blobbercore/allocation/deletefilechange_test.go +++ b/code/go/0chain.net/blobbercore/allocation/deletefilechange_test.go @@ -254,9 +254,7 @@ func TestBlobberCore_DeleteFile(t *testing.T) { config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc - ctx := context.TODO() - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx := datastore.GetStore().CreateTransaction(context.TODO()) change := &DeleteFileChange{ AllocationID: tc.allocationID, diff --git a/code/go/0chain.net/blobbercore/allocation/entity.go b/code/go/0chain.net/blobbercore/allocation/entity.go index d4d953d98..4530a8322 100644 --- a/code/go/0chain.net/blobbercore/allocation/entity.go +++ b/code/go/0chain.net/blobbercore/allocation/entity.go @@ -1,10 +1,12 @@ package allocation import ( + "context" "errors" "fmt" "time" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" "gorm.io/gorm/clause" @@ -162,7 +164,8 @@ func (*Pending) TableName() string { } // GetPendingWrite Get write size that is not yet redeemed -func GetPendingWrite(db *gorm.DB, clientID, allocationID string) (pendingWriteSize int64, err error) { +func GetPendingWrite(ctx context.Context, clientID, allocationID string) (pendingWriteSize int64, err error) { + db := datastore.GetStore().GetTransaction(ctx) err = db.Model(&Pending{}).Select("pending_write").Where( "id=?", fmt.Sprintf("%v:%v", clientID, allocationID), ).Scan(&pendingWriteSize).Error @@ -177,7 +180,8 @@ func GetPendingWrite(db *gorm.DB, clientID, allocationID string) (pendingWriteSi } // GetPendingRead Get read size that is not yet redeemed -func GetPendingRead(db *gorm.DB, clientID, allocationID string) (pendingReadSize int64, err error) { +func GetPendingRead(ctx context.Context, clientID, allocationID string) (pendingReadSize int64, err error) { + db := datastore.GetStore().GetTransaction(ctx) err = db.Model(&Pending{}).Select("pending_read").Where( "id=?", fmt.Sprintf("%v:%v", clientID, allocationID), ).Scan(&pendingReadSize).Error @@ -191,7 +195,8 @@ func GetPendingRead(db *gorm.DB, clientID, allocationID string) (pendingReadSize return } -func AddToPending(db *gorm.DB, clientID, allocationID string, pendingWrite int64) (err error) { +func AddToPending(ctx context.Context, clientID, allocationID string, pendingWrite int64) (err error) { + db := datastore.GetStore().GetTransaction(ctx) key := clientID + ":" + allocationID // Lock is required because two process can simultaneously call this function and read pending data // thus giving same value leading to inconsistent data @@ -215,7 +220,8 @@ func AddToPending(db *gorm.DB, clientID, allocationID string, pendingWrite int64 return nil } -func GetWritePoolsBalance(db *gorm.DB, allocationID string) (balance uint64, err error) { +func GetWritePoolsBalance(ctx context.Context, allocationID string) (balance uint64, err error) { + db := datastore.GetStore().GetTransaction(ctx) err = db.Model(&WritePool{}).Select("COALESCE(SUM(balance),0) as tot_balance").Where( "allocation_id = ?", allocationID, ).Scan(&balance).Error @@ -266,13 +272,14 @@ func (WritePool) TableName() string { return "write_pools" } -func GetReadPool(db *gorm.DB, clientID string) (*ReadPool, error) { +func GetReadPool(ctx context.Context, clientID string) (*ReadPool, error) { + db := datastore.GetStore().GetTransaction(ctx) var rp ReadPool return &rp, db.Model(&ReadPool{}).Where("client_id = ?", clientID).Scan(&rp).Error } -func GetReadPoolsBalance(db *gorm.DB, clientID string) (int64, error) { - rp, err := GetReadPool(db, clientID) +func GetReadPoolsBalance(ctx context.Context, clientID string) (int64, error) { + rp, err := GetReadPool(ctx, clientID) if err != nil { return 0, err } @@ -280,22 +287,24 @@ func GetReadPoolsBalance(db *gorm.DB, clientID string) (int64, error) { return rp.Balance, nil } -func UpsertReadPool(db *gorm.DB, rp *ReadPool) error { +func UpsertReadPool(ctx context.Context, rp *ReadPool) error { updateFields := []string{"balance"} - + db := datastore.GetStore().GetTransaction(ctx) return db.Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "client_id"}}, DoUpdates: clause.AssignmentColumns(updateFields), // column needed to be updated }).Create(&rp).Error } -func UpdateReadPool(db *gorm.DB, rp *ReadPool) error { +func UpdateReadPool(ctx context.Context, rp *ReadPool) error { + db := datastore.GetStore().GetTransaction(ctx) return db.Model(&ReadPool{}).Where("client_id = ?", rp.ClientID).Updates(map[string]interface{}{ "balance": rp.Balance, }).Error } -func SetWritePool(db *gorm.DB, allocationID string, wp *WritePool) (err error) { +func SetWritePool(ctx context.Context, allocationID string, wp *WritePool) (err error) { + db := datastore.GetStore().GetTransaction(ctx) err = db.Delete(&WritePool{}, "allocation_id = ?", allocationID).Error if err != nil { return diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go b/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go index b2cd22da6..f75c8624f 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go @@ -95,9 +95,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) { config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc - ctx := context.TODO() - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx := datastore.GetStore().CreateTransaction(context.TODO()) fPath := "/new" change := &UploadFileChanger{ diff --git a/code/go/0chain.net/blobbercore/allocation/movefilechange_test.go b/code/go/0chain.net/blobbercore/allocation/movefilechange_test.go index 60600cc4d..ebdc70d12 100644 --- a/code/go/0chain.net/blobbercore/allocation/movefilechange_test.go +++ b/code/go/0chain.net/blobbercore/allocation/movefilechange_test.go @@ -468,9 +468,7 @@ func TestBlobberCore_MoveFile(t *testing.T) { config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc - ctx := context.TODO() - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx := datastore.GetStore().CreateTransaction(context.TODO()) change := &MoveFileChange{ AllocationID: tc.allocationID, diff --git a/code/go/0chain.net/blobbercore/allocation/multiop_test.go b/code/go/0chain.net/blobbercore/allocation/multiop_test.go index 3d48e23c6..917cd9030 100644 --- a/code/go/0chain.net/blobbercore/allocation/multiop_test.go +++ b/code/go/0chain.net/blobbercore/allocation/multiop_test.go @@ -32,9 +32,7 @@ func TestMultiOp(t *testing.T) { alloc.OwnerPublicKey = sch.GetPublicKey() alloc.OwnerID = client.GetClientID() datastore.MocketTheStore(t, true) - ctx := context.TODO() - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx := datastore.GetStore().CreateTransaction(context.TODO()) setupDbMock() fileIDMeta := make(map[string]string) fileIDMeta["/"] = randName() diff --git a/code/go/0chain.net/blobbercore/allocation/protocol.go b/code/go/0chain.net/blobbercore/allocation/protocol.go index a50b3cbf8..7edd76cfa 100644 --- a/code/go/0chain.net/blobbercore/allocation/protocol.go +++ b/code/go/0chain.net/blobbercore/allocation/protocol.go @@ -14,19 +14,12 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/blobber/code/go/0chain.net/core/node" "github.com/0chain/blobber/code/go/0chain.net/core/transaction" - "go.uber.org/zap" "gorm.io/gorm" ) // GetAllocationByID from DB. This function doesn't load related terms. func GetAllocationByID(ctx context.Context, allocID string) (a *Allocation, err error) { - var tx = datastore.GetStore().GetTransaction(ctx) - - a = new(Allocation) - err = tx.Model(&Allocation{}). - Where("id=?", allocID). - First(a).Error - return + return Repo.GetById(ctx, allocID) } // LoadTerms loads corresponding terms from DB. Since, the GetAllocationByID @@ -49,14 +42,11 @@ func (a *Allocation) LoadTerms(ctx context.Context) (err error) { return // found in DB } -// VerifyAllocationTransaction try to get allocation from postgres.if it doesn't exists, get it from sharders, and insert it into postgres. +// FetchAllocationFromEventsDB try to get allocation from postgres.if it doesn't exists, get it from sharders, and insert it into postgres. func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, allocationTx string, readonly bool) (a *Allocation, err error) { var tx = datastore.GetStore().GetTransaction(ctx) - a = new(Allocation) - err = tx.Model(&Allocation{}). - Where(&Allocation{Tx: allocationTx}). - First(a).Error + a, err = Repo.GetByTx(ctx, allocationID, allocationTx) if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return nil, common.NewError("bad_db_operation", err.Error()) // unexpected DB error @@ -81,21 +71,13 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc } var isExist bool - err = tx.Model(&Allocation{}). - Where("id = ?", sa.ID). - First(a).Error + a, err = Repo.GetById(ctx, sa.ID) if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return nil, common.NewError("bad_db_operation", err.Error()) // unexpected } isExist = a.ID != "" - logging.Logger.Info("VerifyAllocationTransaction", - zap.Bool("isExist", isExist), - zap.Any("allocation", a), - zap.Any("storageAllocation", sa), - zap.String("node.Self.ID", node.Self.ID)) - if !isExist { foundBlobber := false for _, blobberConnection := range sa.BlobberDetails { @@ -136,6 +118,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc if err != nil { return nil, common.NewError("meta_data_update_error", err.Error()) } + // go update allocation data in file store map // related terms a.Terms = make([]*Terms, 0, len(sa.BlobberDetails)) @@ -154,7 +137,8 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc logging.Logger.Info("Saving the allocation to DB") - err = tx.Save(a).Error + err = Repo.Save(ctx, a) + if err != nil { return nil, err } @@ -181,13 +165,16 @@ func RequestReadPoolStat(clientID string) (*ReadPool, error) { return nil, fmt.Errorf("requesting read pools stat: %v", err) } - var readPool ReadPool - if err = json.Unmarshal(resp, &readPool); err != nil { - return nil, fmt.Errorf("decoding read pools stat response: %v, \n==> resp: %s", err, string(resp)) + if resp != nil { + var readPool ReadPool + if err = json.Unmarshal(resp, &readPool); err != nil { + return nil, fmt.Errorf("decoding read pools stat response: %v, \n==> resp: %s", err, string(resp)) + } + readPool.ClientID = clientID + return &readPool, nil } - readPool.ClientID = clientID - return &readPool, nil + return nil, errors.New("empty response received from MakeSCRestAPICall") } func RequestWritePool(allocationID string) (wps *WritePool, err error) { @@ -205,16 +192,20 @@ func RequestWritePool(allocationID string) (wps *WritePool, err error) { return nil, fmt.Errorf("requesting write pools stat: %v", err) } - var allocation = struct { - ID string `json:"id"` - WritePool uint64 `json:"write_pool"` - }{} - if err = json.Unmarshal(resp, &allocation); err != nil { - return nil, fmt.Errorf("decoding write pools stat response: %v", err) + if resp != nil { + var allocation = struct { + ID string `json:"id"` + WritePool uint64 `json:"write_pool"` + }{} + if err = json.Unmarshal(resp, &allocation); err != nil { + return nil, fmt.Errorf("decoding write pools stat response: %v", err) + } + + return &WritePool{ + AllocationID: allocationID, + Balance: allocation.WritePool, + }, nil } - return &WritePool{ - AllocationID: allocationID, - Balance: allocation.WritePool, - }, nil + return nil, errors.New("empty response received from MakeSCRestAPICall") } diff --git a/code/go/0chain.net/blobbercore/allocation/renamefilechange.go b/code/go/0chain.net/blobbercore/allocation/renamefilechange.go index 75145e6e3..43a78e583 100644 --- a/code/go/0chain.net/blobbercore/allocation/renamefilechange.go +++ b/code/go/0chain.net/blobbercore/allocation/renamefilechange.go @@ -24,7 +24,7 @@ func (rf *RenameFileChange) DeleteTempFile() error { return nil } -func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, +func (rf *RenameFileChange) applyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) { if rf.Path == "/" { diff --git a/code/go/0chain.net/blobbercore/allocation/renamefilechange_integration.go b/code/go/0chain.net/blobbercore/allocation/renamefilechange_integration.go new file mode 100644 index 000000000..1e23b568a --- /dev/null +++ b/code/go/0chain.net/blobbercore/allocation/renamefilechange_integration.go @@ -0,0 +1,28 @@ +//go:build integration_tests +// +build integration_tests + +package allocation + +import ( + "context" + "errors" + + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" + "github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc" + "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/node" +) + +func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, + allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) { + + state := conductrpc.Client().State() + if state.FailRenameCommit != nil { + for _, nodeId := range state.FailRenameCommit { + if nodeId == node.Self.ID { + return nil, errors.New("error directed by conductor") + } + } + } + return rf.applyChange(ctx, rootRef, change, allocationRoot, ts, nil) +} diff --git a/code/go/0chain.net/blobbercore/allocation/renamefilechange_main.go b/code/go/0chain.net/blobbercore/allocation/renamefilechange_main.go new file mode 100644 index 000000000..9c773cab7 --- /dev/null +++ b/code/go/0chain.net/blobbercore/allocation/renamefilechange_main.go @@ -0,0 +1,17 @@ +//go:build !integration_tests +// +build !integration_tests + +package allocation + +import ( + "context" + + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" + "github.com/0chain/blobber/code/go/0chain.net/core/common" +) + +func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, + allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) { + + return rf.applyChange(ctx, rootRef, change, allocationRoot, ts, nil) +} diff --git a/code/go/0chain.net/blobbercore/allocation/renamefilechange_test.go b/code/go/0chain.net/blobbercore/allocation/renamefilechange_test.go index b65e7c587..2be71a223 100644 --- a/code/go/0chain.net/blobbercore/allocation/renamefilechange_test.go +++ b/code/go/0chain.net/blobbercore/allocation/renamefilechange_test.go @@ -431,9 +431,7 @@ func TestBlobberCore_RenameFile(t *testing.T) { datastore.MocketTheStore(t, true) tc.setupDbMock() - ctx := context.TODO() - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx := datastore.GetStore().CreateTransaction(context.TODO()) t.Run(tc.name, func(t *testing.T) { change := &RenameFileChange{AllocationID: alloc.ID, Path: tc.path, NewName: tc.newName} rootRef, err := reference.GetReferencePathFromPaths(ctx, alloc.ID, []string{change.Path}, []string{}) diff --git a/code/go/0chain.net/blobbercore/allocation/repository.go b/code/go/0chain.net/blobbercore/allocation/repository.go new file mode 100644 index 000000000..e8bd9e7b9 --- /dev/null +++ b/code/go/0chain.net/blobbercore/allocation/repository.go @@ -0,0 +1,187 @@ +package allocation + +import ( + "context" + "fmt" + + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "go.uber.org/zap" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +const ( + SQLWhereGetById = "allocations.id = ?" + SQLWhereGetByTx = "allocations.tx = ?" +) + +var ( + Repo *Repository +) + +func init() { + Repo = &Repository{} +} + +type Repository struct { +} + +type Res struct { + ID string +} + +func (r *Repository) GetById(ctx context.Context, id string) (*Allocation, error) { + tx := datastore.GetStore().GetTransaction(ctx) + if tx == nil { + logging.Logger.Panic("no transaction in the context") + } + + cache, err := getCache(tx) + if err != nil { + return nil, err + } + + if a, ok := cache[id]; ok { + return a, nil + } + + alloc := &Allocation{} + err = tx.Table(TableNameAllocation).Where(SQLWhereGetById, id).First(alloc).Error + if err != nil { + return alloc, err + } + + cache[id] = alloc + + return alloc, nil +} + +func (r *Repository) GetByIdAndLock(ctx context.Context, id string) (*Allocation, error) { + var tx = datastore.GetStore().GetTransaction(ctx) + if tx == nil { + logging.Logger.Panic("no transaction in the context") + } + + cache, err := getCache(tx) + if err != nil { + return nil, err + } + + alloc := &Allocation{} + + err = tx.Model(&Allocation{}). + Clauses(clause.Locking{Strength: "NO KEY UPDATE"}). + Where("id=?", id). + First(alloc).Error + if err != nil { + return alloc, err + } + cache[id] = alloc + + return alloc, err +} + +func (r *Repository) GetByTx(ctx context.Context, allocationID, txHash string) (*Allocation, error) { + var tx = datastore.GetStore().GetTransaction(ctx) + if tx == nil { + logging.Logger.Panic("no transaction in the context") + } + + cache, err := getCache(tx) + if err != nil { + return nil, err + } + if a, ok := cache[allocationID]; ok { + if a.Tx == txHash { + return a, nil + } + } + + alloc := &Allocation{} + err = tx.Table(TableNameAllocation).Where(SQLWhereGetByTx, txHash).First(alloc).Error + if err != nil { + return alloc, err + } + cache[allocationID] = alloc + + return alloc, err +} + +func (r *Repository) GetAllocations(ctx context.Context, offset int64) ([]*Allocation, error) { + var tx = datastore.GetStore().GetTransaction(ctx) + + const query = `finalized = false AND cleaned_up = false` + allocs := make([]*Allocation, 0) + return allocs, tx.Model(&Allocation{}). + Where(query). + Limit(UPDATE_LIMIT). + Offset(int(offset)). + Order("id ASC"). + Find(&allocs).Error +} + +func (r *Repository) GetAllocationIds(ctx context.Context) []Res { + var tx = datastore.GetStore().GetTransaction(ctx) + if tx == nil { + logging.Logger.Panic("no transaction in the context") + } + + var res []Res + + err := tx.Model(&Allocation{}).Select("id").Find(&res).Error + if err != nil && err != gorm.ErrRecordNotFound { + logging.Logger.Error("error_getting_allocations_worker", + zap.Any("error", err)) + } + + return res + +} + +func (r *Repository) UpdateAllocationRedeem(ctx context.Context, allocationID, AllocationRoot string) error { + var tx = datastore.GetStore().GetTransaction(ctx) + if tx == nil { + logging.Logger.Panic("no transaction in the context") + } + + cache, err := getCache(tx) + if err != nil { + return err + } + delete(cache, allocationID) + + err = tx.Exec("UPDATE allocations SET latest_redeemed_write_marker=?,is_redeem_required=? WHERE id=?", + AllocationRoot, false, allocationID).Error + + return err +} + +func (r *Repository) Save(ctx context.Context, a *Allocation) error { + var tx = datastore.GetStore().GetTransaction(ctx) + if tx == nil { + logging.Logger.Panic("no transaction in the context") + } + + cache, err := getCache(tx) + if err != nil { + return err + } + + cache[a.ID] = a + return tx.Save(a).Error +} + +func getCache(tx *datastore.EnhancedDB) (map[string]*Allocation, error) { + c, ok := tx.SessionCache[TableNameAllocation] + if ok { + cache, ok := c.(map[string]*Allocation) + if !ok { + return nil, fmt.Errorf("type assertion failed") + } + return cache, nil + } + cache := make(map[string]*Allocation) + tx.SessionCache[TableNameAllocation] = cache + return cache, nil +} diff --git a/code/go/0chain.net/blobbercore/allocation/rollback.go b/code/go/0chain.net/blobbercore/allocation/rollback.go index b656daee7..fe25adaac 100644 --- a/code/go/0chain.net/blobbercore/allocation/rollback.go +++ b/code/go/0chain.net/blobbercore/allocation/rollback.go @@ -6,7 +6,6 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" - "gorm.io/gorm" ) func ApplyRollback(ctx context.Context, allocationID string) error { @@ -15,24 +14,17 @@ func ApplyRollback(ctx context.Context, allocationID string) error { // delete all is_precommit rows - err := db.Transaction(func(tx *gorm.DB) error { - err := db.Model(&reference.Ref{}).Unscoped(). - Delete(&reference.Ref{}, - "allocation_id=? AND is_precommit=? AND deleted_at IS NULL", - allocationID, true).Error - if err != nil { - return err - } - - // err = db.Exec("UPDATE file_stats SET deleted_at=NULL WHERE ref_id IN (SELECT id FROM reference_objects WHERE allocation_id=? AND deleted_at IS NOT NULL)", allocationID).Error - // revive soft deleted ref rows - err = db.Exec("UPDATE reference_objects SET deleted_at=NULL,is_precommit=? WHERE allocation_id=? AND deleted_at IS NOT NULL", false, allocationID).Error - if err != nil { - return err - } - return nil - }) - + err := db.Model(&reference.Ref{}).Unscoped(). + Delete(&reference.Ref{}, + "allocation_id=? AND is_precommit=? AND deleted_at IS NULL", + allocationID, true).Error + if err != nil { + return err + } + + // err = db.Exec("UPDATE file_stats SET deleted_at=NULL WHERE ref_id IN (SELECT id FROM reference_objects WHERE allocation_id=? AND deleted_at IS NOT NULL)", allocationID).Error + // revive soft deleted ref rows + err = db.Exec("UPDATE reference_objects SET deleted_at=NULL,is_precommit=? WHERE allocation_id=? AND deleted_at IS NOT NULL", false, allocationID).Error return err } diff --git a/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go b/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go index bb739277e..d6e2238e6 100644 --- a/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go +++ b/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go @@ -336,9 +336,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) { datastore.MocketTheStore(t, true) tc.setupDbMock() - ctx := context.TODO() - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx := datastore.GetStore().CreateTransaction(context.TODO()) change := &UpdateFileChanger{ BaseFileChanger: BaseFileChanger{ diff --git a/code/go/0chain.net/blobbercore/allocation/workers.go b/code/go/0chain.net/blobbercore/allocation/workers.go index f04ac5a30..952c35a7c 100644 --- a/code/go/0chain.net/blobbercore/allocation/workers.go +++ b/code/go/0chain.net/blobbercore/allocation/workers.go @@ -79,7 +79,7 @@ func updateWork(ctx context.Context) { var ( allocs []*Allocation - count int64 + count int offset int64 err error @@ -87,7 +87,7 @@ func updateWork(ctx context.Context) { // iterate all in loop accepting allocations with limit - for start := true; start || (offset < count); start = false { + for start := true; start || (offset < int64(count)); start = false { allocs, count, err = findAllocations(ctx, offset) if err != nil { logging.Logger.Error("finding allocations in DB", zap.Error(err)) @@ -109,28 +109,15 @@ func updateWork(ctx context.Context) { } // not finalized, not cleaned up -func findAllocations(ctx context.Context, offset int64) (allocs []*Allocation, count int64, err error) { - const query = `finalized = false AND cleaned_up = false` +func findAllocations(ctx context.Context, offset int64) (allocs []*Allocation, count int, err error) { ctx = datastore.GetStore().CreateTransaction(ctx) var tx = datastore.GetStore().GetTransaction(ctx) defer tx.Rollback() - err = tx.Model(&Allocation{}).Where(query).Count(&count).Error - if err != nil { - logging.Logger.Error(err.Error()) - return - } - - allocs = make([]*Allocation, 0) - err = tx.Model(&Allocation{}). - Where(query). - Limit(UPDATE_LIMIT). - Offset(int(offset)). - Order("id ASC"). - Find(&allocs).Error - return + allocations, err := Repo.GetAllocations(ctx, offset) + return allocations, len(allocations), err } func shouldFinalize(sa *transaction.StorageAllocation) bool { @@ -197,7 +184,7 @@ func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.St ctx = datastore.GetStore().CreateTransaction(ctx) var tx = datastore.GetStore().GetTransaction(ctx) - defer commit(tx, &err) + defer commit(tx.DB, &err) var changed bool = a.Tx != sa.Tx @@ -224,7 +211,7 @@ func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.St } // save allocations - if err := tx.Save(a).Error; err != nil { + if err := Repo.Save(ctx, a); err != nil { return nil, err } @@ -239,6 +226,7 @@ func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.St } } + logging.Logger.Info("allocation updated", zap.String("id", a.ID), zap.Any("a", a)) return a, nil // ok } @@ -276,7 +264,7 @@ func cleanupAllocation(ctx context.Context, a *Allocation) { ctx = datastore.GetStore().CreateTransaction(ctx) var tx = datastore.GetStore().GetTransaction(ctx) - defer commit(tx, &err) + defer commit(tx.DB, &err) a.CleanedUp = true if err = tx.Model(a).Updates(a).Error; err != nil { @@ -287,7 +275,7 @@ func cleanupAllocation(ctx context.Context, a *Allocation) { func deleteAllocation(ctx context.Context, a *Allocation) (err error) { ctx = datastore.GetStore().CreateTransaction(ctx) var tx = datastore.GetStore().GetTransaction(ctx) - defer commit(tx, &err) + defer commit(tx.DB, &err) filestore.GetFileStore().DeleteAllocation(a.ID) err = tx.Model(&reference.Ref{}).Unscoped(). diff --git a/code/go/0chain.net/blobbercore/allocation/zcn.go b/code/go/0chain.net/blobbercore/allocation/zcn.go index 0dff85ceb..3ff8ac26b 100644 --- a/code/go/0chain.net/blobbercore/allocation/zcn.go +++ b/code/go/0chain.net/blobbercore/allocation/zcn.go @@ -1,12 +1,16 @@ package allocation import ( + "context" + "math" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/blobber/code/go/0chain.net/core/node" "github.com/0chain/errors" + "go.uber.org/zap" "gorm.io/gorm" - "math" ) // SyncAllocation try to pull allocation from blockchain, and insert it in db. @@ -62,7 +66,8 @@ func SyncAllocation(allocationId string) (*Allocation, error) { } err = datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { - if err := tx.Table(TableNameAllocation).Save(alloc).Error; err != nil { + ctx := datastore.GetStore().WithTransaction(context.Background(), tx) + if err := Repo.Save(ctx, alloc); err != nil { return err } @@ -75,5 +80,15 @@ func SyncAllocation(allocationId string) (*Allocation, error) { return nil }) + if err != nil { + return nil, errors.Throw(err, "meta_data_update_error", err.Error()) + } + + logging.Logger.Info("Saving the allocation to DB", zap.Any( + "allocation", alloc), zap.Error(err)) + if err != nil { + return nil, err + } + return alloc, err } diff --git a/code/go/0chain.net/blobbercore/challenge/entity.go b/code/go/0chain.net/blobbercore/challenge/entity.go index d30fb28f2..6218673fa 100644 --- a/code/go/0chain.net/blobbercore/challenge/entity.go +++ b/code/go/0chain.net/blobbercore/challenge/entity.go @@ -141,7 +141,7 @@ func unMarshalField(stringObj datatypes.JSON, dest interface{}) error { func (cr *ChallengeEntity) Save(ctx context.Context) error { db := datastore.GetStore().GetTransaction(ctx) - return cr.SaveWith(db) + return cr.SaveWith(db.DB) } func (cr *ChallengeEntity) SaveWith(db *gorm.DB) error { diff --git a/code/go/0chain.net/blobbercore/challenge/protocol.go b/code/go/0chain.net/blobbercore/challenge/protocol.go index 67a853300..9f5ecac0e 100644 --- a/code/go/0chain.net/blobbercore/challenge/protocol.go +++ b/code/go/0chain.net/blobbercore/challenge/protocol.go @@ -255,7 +255,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { resp, err := util.SendPostRequest(url, postDataBytes, nil) if err != nil { //network issue, don't cancel it, and try it again - logging.Logger.Info("[challenge]post: ", zap.Any("error", err.Error())) + logging.Logger.Error("[challenge]post: ", zap.Any("error", err.Error())) updateMapAndSlice(validatorID, i, nil) return } diff --git a/code/go/0chain.net/blobbercore/challenge/timing.go b/code/go/0chain.net/blobbercore/challenge/timing.go index 80282d08e..938e237f7 100644 --- a/code/go/0chain.net/blobbercore/challenge/timing.go +++ b/code/go/0chain.net/blobbercore/challenge/timing.go @@ -167,3 +167,10 @@ func GetChallengeTimings(from common.Timestamp, limit common.Pagination) ([]*Cha } return chs, nil } + +func GetChallengeTiming(challengeID string) (*ChallengeTiming, error) { + var ch *ChallengeTiming + + err := datastore.GetStore().GetDB().Model(&ChallengeTiming{}).Where("challenge_id = ?", challengeID).First(&ch).Error + return ch, err +} diff --git a/code/go/0chain.net/blobbercore/challenge/worker.go b/code/go/0chain.net/blobbercore/challenge/worker.go index 4c2b01d6f..509500819 100644 --- a/code/go/0chain.net/blobbercore/challenge/worker.go +++ b/code/go/0chain.net/blobbercore/challenge/worker.go @@ -6,6 +6,7 @@ import ( "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/emirpasic/gods/maps/treemap" "go.uber.org/zap" @@ -177,12 +178,21 @@ func getBatch(batchSize int) (chall []ChallengeEntity) { func (it *ChallengeEntity) createChallenge() bool { challengeMapLock.Lock() + defer challengeMapLock.Unlock() if _, ok := challengeMap.Get(it.RoundCreatedAt); ok { - challengeMapLock.Unlock() + return false + } + db := datastore.GetStore().GetDB() + var Found bool + err := db.Raw("SELECT EXISTS(SELECT 1 FROM challenge_timing WHERE challenge_id = ?) AS found", it.ChallengeID).Scan(&Found).Error + if err != nil { + logging.Logger.Error("createChallenge", zap.Error(err)) + return false + } else if Found { + logging.Logger.Info("createChallenge", zap.String("challenge_id", it.ChallengeID), zap.String("status", "already exists")) return false } challengeMap.Put(it.RoundCreatedAt, it) - challengeMapLock.Unlock() return true } diff --git a/code/go/0chain.net/blobbercore/datastore/mocket.go b/code/go/0chain.net/blobbercore/datastore/mocket.go index 6575e8876..de975d32f 100644 --- a/code/go/0chain.net/blobbercore/datastore/mocket.go +++ b/code/go/0chain.net/blobbercore/datastore/mocket.go @@ -73,18 +73,22 @@ func (store *Mocket) Close() { func (store *Mocket) CreateTransaction(ctx context.Context) context.Context { db := store.db.Begin() - return context.WithValue(ctx, ContextKeyTransaction, db) + return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(db)) } -func (store *Mocket) GetTransaction(ctx context.Context) *gorm.DB { +func (store *Mocket) GetTransaction(ctx context.Context) *EnhancedDB { conn := ctx.Value(ContextKeyTransaction) if conn != nil { - return conn.(*gorm.DB) + return conn.(*EnhancedDB) } Logger.Error("No connection in the context.") return nil } +func (store *Mocket) WithTransaction(ctx context.Context, tx *gorm.DB) context.Context { + return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(tx)) +} + func (store *Mocket) GetDB() *gorm.DB { return store.db } diff --git a/code/go/0chain.net/blobbercore/datastore/postgres.go b/code/go/0chain.net/blobbercore/datastore/postgres.go index 41478e9f3..0d237aaac 100644 --- a/code/go/0chain.net/blobbercore/datastore/postgres.go +++ b/code/go/0chain.net/blobbercore/datastore/postgres.go @@ -85,18 +85,22 @@ func (store *postgresStore) Close() { func (store *postgresStore) CreateTransaction(ctx context.Context) context.Context { db := store.db.Begin() - return context.WithValue(ctx, ContextKeyTransaction, db) + return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(db)) } -func (store *postgresStore) GetTransaction(ctx context.Context) *gorm.DB { +func (store *postgresStore) GetTransaction(ctx context.Context) *EnhancedDB { conn := ctx.Value(ContextKeyTransaction) if conn != nil { - return conn.(*gorm.DB) + return conn.(*EnhancedDB) } logging.Logger.Error("No connection in the context.") return nil } +func (store *postgresStore) WithTransaction(ctx context.Context, tx *gorm.DB) context.Context { + return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(tx)) +} + func (store *postgresStore) GetDB() *gorm.DB { return store.db } diff --git a/code/go/0chain.net/blobbercore/datastore/sqlmock.go b/code/go/0chain.net/blobbercore/datastore/sqlmock.go index 606949625..f0fdc6a3f 100644 --- a/code/go/0chain.net/blobbercore/datastore/sqlmock.go +++ b/code/go/0chain.net/blobbercore/datastore/sqlmock.go @@ -71,15 +71,19 @@ func (store *Sqlmock) CreateTransaction(ctx context.Context) context.Context { return context.WithValue(ctx, ContextKeyTransaction, db) } -func (store *Sqlmock) GetTransaction(ctx context.Context) *gorm.DB { +func (store *Sqlmock) GetTransaction(ctx context.Context) *EnhancedDB { conn := ctx.Value(ContextKeyTransaction) if conn != nil { - return conn.(*gorm.DB) + return conn.(*EnhancedDB) } Logger.Error("No connection in the context.") return nil } +func (store *Sqlmock) WithTransaction(ctx context.Context, tx *gorm.DB) context.Context { + return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(tx)) +} + func (store *Sqlmock) GetDB() *gorm.DB { return store.db } diff --git a/code/go/0chain.net/blobbercore/datastore/store.go b/code/go/0chain.net/blobbercore/datastore/store.go index a950410e6..e24ca8804 100644 --- a/code/go/0chain.net/blobbercore/datastore/store.go +++ b/code/go/0chain.net/blobbercore/datastore/store.go @@ -13,6 +13,16 @@ const ( ContextKeyStore ) +type EnhancedDB struct { + SessionCache map[string]interface{} + *gorm.DB +} + +func EnhanceDB(db *gorm.DB) *EnhancedDB { + cache := make(map[string]interface{}) + return &EnhancedDB{DB: db, SessionCache: cache} +} + type Store interface { // GetDB get raw gorm db @@ -20,8 +30,8 @@ type Store interface { // CreateTransaction create transaction, and save it in context CreateTransaction(ctx context.Context) context.Context // GetTransaction get transaction from context - GetTransaction(ctx context.Context) *gorm.DB - + GetTransaction(ctx context.Context) *EnhancedDB + WithTransaction(ctx context.Context, tx *gorm.DB) context.Context // Get db connection with user that creates roles and databases. Its dialactor does not contain database name GetPgDB() (*gorm.DB, error) Open() error diff --git a/code/go/0chain.net/blobbercore/handler/authticket.go b/code/go/0chain.net/blobbercore/handler/authticket.go index a4d4d6a08..c96abc0ba 100644 --- a/code/go/0chain.net/blobbercore/handler/authticket.go +++ b/code/go/0chain.net/blobbercore/handler/authticket.go @@ -10,11 +10,10 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" - "gorm.io/gorm" ) // verifyAuthTicket verifies authTicket and returns authToken and error if any. For any error authToken is nil -func verifyAuthTicket(ctx context.Context, db *gorm.DB, authTokenString string, allocationObj *allocation.Allocation, refRequested *reference.Ref, clientID string, verifyShare bool) (*readmarker.AuthTicket, error) { +func verifyAuthTicket(ctx context.Context, authTokenString string, allocationObj *allocation.Allocation, refRequested *reference.Ref, clientID string, verifyShare bool) (*readmarker.AuthTicket, error) { if authTokenString == "" { return nil, common.NewError("invalid_parameters", "Auth ticket is required") } @@ -29,7 +28,7 @@ func verifyAuthTicket(ctx context.Context, db *gorm.DB, authTokenString string, } if refRequested.LookupHash != authToken.FilePathHash { - authTokenRef, err := reference.GetLimitedRefFieldsByLookupHashWith(ctx, db, authToken.AllocationID, authToken.FilePathHash, []string{"id", "path"}) + authTokenRef, err := reference.GetLimitedRefFieldsByLookupHashWith(ctx, authToken.AllocationID, authToken.FilePathHash, []string{"id", "path"}) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/handler/challenge_timings.go b/code/go/0chain.net/blobbercore/handler/challenge_timings.go index af02bf711..5ea951a5c 100644 --- a/code/go/0chain.net/blobbercore/handler/challenge_timings.go +++ b/code/go/0chain.net/blobbercore/handler/challenge_timings.go @@ -29,3 +29,11 @@ func GetChallengeTimings(ctx context.Context, r *http.Request) (interface{}, err return challenge.GetChallengeTimings(from, limit) } + +func GetChallengeTiming(ctx context.Context, r *http.Request) (interface{}, error) { + var ( + challengeID = r.URL.Query().Get("challenge_id") + ) + + return challenge.GetChallengeTiming(challengeID) +} diff --git a/code/go/0chain.net/blobbercore/handler/handler.go b/code/go/0chain.net/blobbercore/handler/handler.go index cb791b655..7e083f0cd 100644 --- a/code/go/0chain.net/blobbercore/handler/handler.go +++ b/code/go/0chain.net/blobbercore/handler/handler.go @@ -229,6 +229,7 @@ func setupHandlers(r *mux.Router) { r.HandleFunc("/getstats", RateLimitByCommmitRL(common.ToJSONResponse(stats.GetStatsHandler))) // r.HandleFunc("/challengetimings", common.AuthenticateAdmin(common.ToJSONResponse(GetChallengeTimings))) r.HandleFunc("/challengetimings", RateLimitByCommmitRL(common.ToJSONResponse(GetChallengeTimings))) + r.HandleFunc("/challenge-timings-by-challengeId", RateLimitByCommmitRL(common.ToJSONResponse(GetChallengeTiming))) //marketplace related r.HandleFunc("/v1/marketplace/shareinfo/{allocation}", diff --git a/code/go/0chain.net/blobbercore/handler/handler_playlist.go b/code/go/0chain.net/blobbercore/handler/handler_playlist.go index 0fd0505c1..4cb4637b3 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_playlist.go +++ b/code/go/0chain.net/blobbercore/handler/handler_playlist.go @@ -25,7 +25,7 @@ func LoadPlaylist(ctx *Context) (interface{}, error) { return nil, errors.New("lookup_hash_missed: auth_token and lookup_hash are required") } - fileRef, err := reference.GetLimitedRefFieldsByLookupHashWith(ctx, ctx.Store.GetDB(), ctx.AllocationId, lookupHash, []string{"id", "path", "lookup_hash", "type", "name"}) + fileRef, err := reference.GetLimitedRefFieldsByLookupHashWith(ctx, ctx.AllocationId, lookupHash, []string{"id", "path", "lookup_hash", "type", "name"}) if err != nil { return nil, common.NewError("invalid_lookup_hash", err.Error()) } @@ -35,7 +35,7 @@ func LoadPlaylist(ctx *Context) (interface{}, error) { return nil, common.NewError("invalid_auth_ticket", err.Error()) } - authToken, err := verifyAuthTicket(ctx, ctx.Store.GetDB(), string(at), ctx.Allocation, fileRef, ctx.ClientID, true) + authToken, err := verifyAuthTicket(ctx, string(at), ctx.Allocation, fileRef, ctx.ClientID, true) if err != nil { return nil, err } @@ -67,7 +67,7 @@ func LoadPlaylistFile(ctx *Context) (interface{}, error) { //load playlist with auth ticket if len(authTokenString) > 0 { - fileRef, err := reference.GetLimitedRefFieldsByLookupHashWith(ctx, ctx.Store.GetDB(), ctx.AllocationId, lookupHash, []string{"id", "path", "lookup_hash", "type", "name"}) + fileRef, err := reference.GetLimitedRefFieldsByLookupHashWith(ctx, ctx.AllocationId, lookupHash, []string{"id", "path", "lookup_hash", "type", "name"}) if err != nil { return nil, common.NewError("invalid_lookup_hash", err.Error()) } @@ -75,7 +75,7 @@ func LoadPlaylistFile(ctx *Context) (interface{}, error) { if err != nil { return nil, common.NewError("invalid_auth_ticket", err.Error()) } - authToken, err := verifyAuthTicket(ctx, ctx.Store.GetDB(), string(at), ctx.Allocation, fileRef, ctx.ClientID, true) + authToken, err := verifyAuthTicket(ctx, string(at), ctx.Allocation, fileRef, ctx.ClientID, true) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/handler/handler_writemarker.go b/code/go/0chain.net/blobbercore/handler/handler_writemarker.go index f8dccbabc..5704862a8 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_writemarker.go +++ b/code/go/0chain.net/blobbercore/handler/handler_writemarker.go @@ -3,6 +3,8 @@ package handler import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" "github.com/0chain/blobber/code/go/0chain.net/core/common" + . "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "go.uber.org/zap" ) var WriteMarkerMutext = &writemarker.Mutex{ @@ -14,6 +16,7 @@ func LockWriteMarker(ctx *Context) (interface{}, error) { connectionID, _ := ctx.FormValue("connection_id") result, err := WriteMarkerMutext.Lock(ctx, ctx.AllocationId, connectionID) + Logger.Info("Lock write marker result", zap.Any("result", result), zap.Error(err)) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index e2f8d4c08..4e58e8526 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -27,10 +27,8 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/core/lock" "github.com/0chain/blobber/code/go/0chain.net/core/node" - "gorm.io/gorm" - "gorm.io/gorm/clause" - "go.uber.org/zap" + "gorm.io/gorm" . "github.com/0chain/blobber/code/go/0chain.net/core/logging" ) @@ -52,7 +50,6 @@ func readPreRedeem( // check out read pool tokens if read_price > 0 var ( - db = datastore.GetStore().GetTransaction(ctx) blobberID = node.Self.ID ) @@ -60,7 +57,7 @@ func readPreRedeem( return // skip if read price is zero } - readPoolBalance, err := allocation.GetReadPoolsBalance(db, payerID) + readPoolBalance, err := allocation.GetReadPoolsBalance(ctx, payerID) if err != nil { return common.NewError("read_pre_redeem", "database error while reading read pools balance") } @@ -73,7 +70,7 @@ func readPreRedeem( } rp.ClientID = payerID - err = allocation.UpdateReadPool(db, rp) + err = allocation.UpdateReadPool(ctx, rp) if err != nil { return common.NewErrorf("read_pre_redeem", "can't save requested read pools: %v", err) } @@ -108,7 +105,6 @@ func checkPendingMarkers(ctx context.Context, allocationID string) error { func writePreRedeem(ctx context.Context, alloc *allocation.Allocation, writeMarker *writemarker.WriteMarker, payerID string) (err error) { // check out read pool tokens if read_price > 0 var ( - db = datastore.GetStore().GetTransaction(ctx) blobberID = node.Self.ID requiredBalance = alloc.GetRequiredWriteBalance(blobberID, writeMarker.Size, writeMarker.Timestamp) wp *allocation.WritePool @@ -118,13 +114,13 @@ func writePreRedeem(ctx context.Context, alloc *allocation.Allocation, writeMark return } - writePoolBalance, err := allocation.GetWritePoolsBalance(db, alloc.ID) + writePoolBalance, err := allocation.GetWritePoolsBalance(ctx, alloc.ID) if err != nil { Logger.Error("write_pre_redeem:get_write_pools_balance", zap.Error(err), zap.String("allocation_id", alloc.ID)) return common.NewError("write_pre_redeem", "database error while getting write pool balance") } - pendingWriteSize, err := allocation.GetPendingWrite(db, payerID, alloc.ID) + pendingWriteSize, err := allocation.GetPendingWrite(ctx, payerID, alloc.ID) if err != nil { escapedPayerID := sanitizeString(payerID) Logger.Error("write_pre_redeem:get_pending_write", zap.Error(err), zap.String("allocation_id", alloc.ID), zap.String("payer_id", escapedPayerID)) @@ -139,7 +135,7 @@ func writePreRedeem(ctx context.Context, alloc *allocation.Allocation, writeMark return common.NewErrorf("write_pre_redeem", "can't request write pools from sharders: %v", err) } - err = allocation.SetWritePool(db, alloc.ID, wp) + err = allocation.SetWritePool(ctx, alloc.ID, wp) if err != nil { return common.NewErrorf("write_pre_redeem", "can't save requested write pools: %v", err) } @@ -154,7 +150,7 @@ func writePreRedeem(ctx context.Context, alloc *allocation.Allocation, writeMark alloc.ID, writeMarker.BlobberID, writePoolBalance, requiredBalance) } - if err := allocation.AddToPending(db, payerID, alloc.ID, writeMarker.Size); err != nil { + if err := allocation.AddToPending(ctx, payerID, alloc.ID, writeMarker.Size); err != nil { Logger.Error(err.Error()) return common.NewErrorf("write_pre_redeem", "can't save pending writes in DB") @@ -654,17 +650,16 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b if err = db.Create(writemarkerEntity).Error; err != nil { return nil, common.NewError("write_marker_error", "Error persisting the write marker") } + allocationObj.AllocationRoot = allocationRoot + allocationObj.FileMetaRoot = fileMetaRoot + allocationObj.IsRedeemRequired = true + allocationObj.BlobberSizeUsed += connectionObj.Size + allocationObj.UsedSize += connectionObj.Size - allocationUpdates := make(map[string]interface{}) - allocationUpdates["blobber_size_used"] = gorm.Expr("blobber_size_used + ?", connectionObj.Size) - allocationUpdates["used_size"] = gorm.Expr("used_size + ?", connectionObj.Size) - allocationUpdates["allocation_root"] = allocationRoot - allocationUpdates["file_meta_root"] = fileMetaRoot - allocationUpdates["is_redeem_required"] = true - - if err = db.Model(allocationObj).Updates(allocationUpdates).Error; err != nil { + if err = allocation.Repo.Save(ctx, allocationObj); err != nil { return nil, common.NewError("allocation_write_error", "Error persisting the allocation object") } + err = connectionObj.CommitToFileStore(ctx) if err != nil { if !errors.Is(common.ErrFileWasDeleted, err) { @@ -1361,16 +1356,20 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob } elapsedWritePreRedeem := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM - - err = allocation.ApplyRollback(ctx, allocationID) + c := datastore.GetStore().CreateTransaction(context.TODO()) + defer c.Done() + txn := datastore.GetStore().GetTransaction(c) + err = allocation.ApplyRollback(c, allocationID) if err != nil { + txn.Rollback() return nil, common.NewError("allocation_rollback_error", "Error applying the rollback for allocation: "+err.Error()) } elapsedApplyRollback := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedVerifyWM - elapsedWritePreRedeem //get allocation root and ref - rootRef, err := reference.GetLimitedRefFieldsByPath(ctx, allocationID, "/", []string{"hash", "file_meta_hash", "is_precommit"}) + rootRef, err := reference.GetLimitedRefFieldsByPath(c, allocationID, "/", []string{"hash", "file_meta_hash", "is_precommit"}) if err != nil && err != gorm.ErrRecordNotFound { + txn.Rollback() return nil, common.NewError("root_ref_read_error", "Error reading the root reference: "+err.Error()) } if err == gorm.ErrRecordNotFound { @@ -1389,6 +1388,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob result.Success = false result.ErrorMessage = "Allocation root in the write marker does not match the calculated allocation root." + " Expected hash: " + allocationRoot + txn.Rollback() return &result, common.NewError("allocation_root_mismatch", result.ErrorMessage) } @@ -1399,6 +1399,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob result.Success = false result.ErrorMessage = "File meta root in the write marker does not match the calculated file meta root." + " Expected hash: " + fileMetaRoot + "; Got: " + writeMarker.FileMetaRoot + txn.Rollback() return &result, common.NewError("file_meta_root_mismatch", result.ErrorMessage) } @@ -1406,42 +1407,36 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob writemarkerEntity.ClientPublicKey = clientKey Logger.Info("rollback_writemarker", zap.Any("writemarker", writemarkerEntity.WM)) - db := datastore.GetStore().GetDB() - alloc := &allocation.Allocation{} - - err = db.Transaction(func(tx *gorm.DB) error { - - err := tx.Model(&allocation.Allocation{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("is_redeem_required").Where("id=?", allocationID).First(alloc).Error - if err != nil { - return common.NewError("allocation_read_error", "Error reading the allocation object") - } - - allocationUpdates := make(map[string]interface{}) - allocationUpdates["blobber_size_used"] = gorm.Expr("blobber_size_used - ?", latestWriteMarkerEntity.WM.Size) - allocationUpdates["used_size"] = gorm.Expr("used_size - ?", latestWriteMarkerEntity.WM.Size) - allocationUpdates["is_redeem_required"] = true - allocationUpdates["allocation_root"] = allocationRoot - allocationUpdates["file_meta_root"] = fileMetaRoot + alloc, err := allocation.Repo.GetByIdAndLock(c, allocationID) + if err != nil { + txn.Rollback() + return &result, common.NewError("allocation_read_error", "Error reading the allocation object") + } - if alloc.IsRedeemRequired { - writemarkerEntity.Status = writemarker.Rollbacked - allocationUpdates["is_redeem_required"] = false - } - err = tx.Create(writemarkerEntity).Error - if err != nil { - return common.NewError("write_marker_error", "Error persisting the write marker "+err.Error()) - } + alloc.BlobberSizeUsed -= latestWriteMarkerEntity.WM.Size + alloc.UsedSize -= latestWriteMarkerEntity.WM.Size + alloc.AllocationRoot = allocationRoot + alloc.FileMetaRoot = fileMetaRoot - if err = tx.Model(allocationObj).Updates(allocationUpdates).Error; err != nil { - return common.NewError("allocation_write_error", "Error persisting the allocation object "+err.Error()) - } + if alloc.IsRedeemRequired { + writemarkerEntity.Status = writemarker.Rollbacked + alloc.IsRedeemRequired = false + } + err = txn.Create(writemarkerEntity).Error + if err != nil { + txn.Rollback() + return &result, common.NewError("write_marker_error", "Error persisting the write marker "+err.Error()) + } + if err = allocation.Repo.Save(c, alloc); err != nil { + txn.Rollback() + return &result, common.NewError("allocation_write_error", "Error persisting the allocation object "+err.Error()) + } - return nil - }) + err = txn.Commit().Error if err != nil { - return nil, err + return &result, common.NewError("allocation_commit_error", "Error committing the transaction "+err.Error()) } - if !alloc.IsRedeemRequired { + if alloc.IsRedeemRequired { err = writemarkerEntity.SendToChan(ctx) if err != nil { return nil, common.NewError("write_marker_error", "Error redeeming the write marker") diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler_test.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler_test.go index 201ceda0c..ced24036d 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler_test.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler_test.go @@ -290,8 +290,8 @@ func TestDownloadFile(t *testing.T) { ctx = context.WithValue(ctx, constants.ContextKeyAllocation, p.inData.allocationTx) ctx = context.WithValue(ctx, constants.ContextKeyClientKey, client.GetClientPublicKey()) - db := datastore.GetStore().GetDB().Begin() - ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db) + ctx = datastore.GetStore().CreateTransaction(ctx) + return ctx } diff --git a/code/go/0chain.net/blobbercore/handler/storage_handler.go b/code/go/0chain.net/blobbercore/handler/storage_handler.go index 557294497..8e3cfad68 100644 --- a/code/go/0chain.net/blobbercore/handler/storage_handler.go +++ b/code/go/0chain.net/blobbercore/handler/storage_handler.go @@ -9,11 +9,11 @@ import ( "regexp" "strconv" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "gorm.io/gorm" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" - "github.com/0chain/gosdk/constants" "go.uber.org/zap" @@ -50,6 +50,8 @@ func (fsh *StorageHandler) verifyAllocation(ctx context.Context, allocationID, a "verifying allocation transaction error: %v", err) } + logging.Logger.Info("verifyAllocation", zap.Any("alloc", alloc), zap.Any("now", common.Now())) + if alloc.Expiration < common.Now() { return nil, common.NewError("verify_allocation", "use of expired allocation") @@ -70,10 +72,7 @@ func (fsh *StorageHandler) convertGormError(err error) error { // verifyAuthTicket verifies authTicket and returns authToken and error if any. For any error authToken is nil func (fsh *StorageHandler) verifyAuthTicket(ctx context.Context, authTokenString string, allocationObj *allocation.Allocation, refRequested *reference.Ref, clientID string, verifyShare bool) (*readmarker.AuthTicket, error) { - - db := datastore.GetStore().GetTransaction(ctx) - - return verifyAuthTicket(ctx, db, authTokenString, allocationObj, refRequested, clientID, verifyShare) + return verifyAuthTicket(ctx, authTokenString, allocationObj, refRequested, clientID, verifyShare) } func (fsh *StorageHandler) GetAllocationDetails(ctx context.Context, r *http.Request) (interface{}, error) { diff --git a/code/go/0chain.net/blobbercore/handler/worker.go b/code/go/0chain.net/blobbercore/handler/worker.go index 127f90812..2634b148e 100644 --- a/code/go/0chain.net/blobbercore/handler/worker.go +++ b/code/go/0chain.net/blobbercore/handler/worker.go @@ -4,14 +4,12 @@ import ( "context" "time" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" - "github.com/0chain/blobber/code/go/0chain.net/core/lock" - "gorm.io/gorm" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" + "github.com/0chain/blobber/code/go/0chain.net/core/lock" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "go.uber.org/zap" @@ -27,15 +25,16 @@ func CleanupDiskFiles(ctx context.Context) error { db.Find(&allocations) for _, allocationObj := range allocations { - cleanupAllocationFiles(db, allocationObj) + cleanupAllocationFiles(ctx, allocationObj) } return nil } -func cleanupAllocationFiles(db *gorm.DB, allocationObj allocation.Allocation) { +func cleanupAllocationFiles(ctx context.Context, allocationObj allocation.Allocation) { mutex := lock.GetMutex(allocationObj.TableName(), allocationObj.ID) mutex.Lock() defer mutex.Unlock() + db := datastore.GetStore().GetTransaction(ctx) _ = filestore.GetFileStore().IterateObjects(allocationObj.ID, func(hash string, contentSize int64) { var refs []reference.Ref diff --git a/code/go/0chain.net/blobbercore/readmarker/readmarker.go b/code/go/0chain.net/blobbercore/readmarker/readmarker.go index e49ec747a..7d90819ef 100644 --- a/code/go/0chain.net/blobbercore/readmarker/readmarker.go +++ b/code/go/0chain.net/blobbercore/readmarker/readmarker.go @@ -184,7 +184,6 @@ func SaveLatestReadMarker(ctx context.Context, rm *ReadMarker, latestRedeemedRC // Sync read marker with 0chain to be sure its correct. func (rm *ReadMarkerEntity) Sync(ctx context.Context) error { - var db = datastore.GetStore().GetTransaction(ctx) // update local read pools cache from sharders rp, err := allocation.RequestReadPoolStat(rm.LatestRM.ClientID) if err != nil { @@ -192,7 +191,7 @@ func (rm *ReadMarkerEntity) Sync(ctx context.Context) error { } // save the fresh read pools information - err = allocation.UpsertReadPool(db, rp) + err = allocation.UpsertReadPool(ctx, rp) if err != nil { return common.NewErrorf("rme_sync", "can't update read pools from sharders: %v", err) } @@ -222,7 +221,7 @@ func (rme *ReadMarkerEntity) UpdateStatus(ctx context.Context, txOutput, redeemT return common.NewErrorf("rme_update_status", "can't get read pools from sharders: %v", err) } - if err := allocation.UpdateReadPool(db, rp); err != nil { + if err := allocation.UpdateReadPool(ctx, rp); err != nil { return common.NewErrorf("rme_update_status", "can't update local read pools cache: %v", err) } diff --git a/code/go/0chain.net/blobbercore/reference/ref.go b/code/go/0chain.net/blobbercore/reference/ref.go index aab633054..7cce69d95 100644 --- a/code/go/0chain.net/blobbercore/reference/ref.go +++ b/code/go/0chain.net/blobbercore/reference/ref.go @@ -193,8 +193,8 @@ func GetReference(ctx context.Context, allocationID, path string) (*Ref, error) // GetLimitedRefFieldsByPath get FileRef selected fields with allocationID and path from postgres func GetLimitedRefFieldsByPath(ctx context.Context, allocationID, path string, selectedFields []string) (*Ref, error) { ref := &Ref{} - db := datastore.GetStore().GetTransaction(ctx) - db = db.Select(selectedFields) + t := datastore.GetStore().GetTransaction(ctx) + db := t.Select(selectedFields) err := db.Where(&Ref{AllocationID: allocationID, Path: path}).First(ref).Error if err != nil { return nil, err @@ -203,8 +203,9 @@ func GetLimitedRefFieldsByPath(ctx context.Context, allocationID, path string, s } // GetLimitedRefFieldsByLookupHash get FileRef selected fields with allocationID and lookupHash from postgres -func GetLimitedRefFieldsByLookupHashWith(ctx context.Context, db *gorm.DB, allocationID, lookupHash string, selectedFields []string) (*Ref, error) { +func GetLimitedRefFieldsByLookupHashWith(ctx context.Context, allocationID, lookupHash string, selectedFields []string) (*Ref, error) { ref := &Ref{} + db := datastore.GetStore().GetTransaction(ctx) err := db. Select(selectedFields). @@ -220,8 +221,8 @@ func GetLimitedRefFieldsByLookupHashWith(ctx context.Context, db *gorm.DB, alloc // GetLimitedRefFieldsByLookupHash get FileRef selected fields with allocationID and lookupHash from postgres func GetLimitedRefFieldsByLookupHash(ctx context.Context, allocationID, lookupHash string, selectedFields []string) (*Ref, error) { ref := &Ref{} - db := datastore.GetStore().GetTransaction(ctx) - db = db.Select(selectedFields) + t := datastore.GetStore().GetTransaction(ctx) + db := t.Select(selectedFields) err := db.Where(&Ref{AllocationID: allocationID, LookupHash: lookupHash}).First(ref).Error if err != nil { return nil, err @@ -286,8 +287,8 @@ func GetRefsTypeFromPaths(ctx context.Context, allocationID string, paths []stri return } - db := datastore.GetStore().GetTransaction(ctx) - db = db.Select("path", "type") + t := datastore.GetStore().GetTransaction(ctx) + db := t.Select("path", "type") for _, p := range paths { db = db.Or(Ref{AllocationID: allocationID, Path: p}) } @@ -314,8 +315,8 @@ func GetSubDirsFromPath(p string) []string { func GetRefWithChildren(ctx context.Context, allocationID, path string) (*Ref, error) { var refs []Ref - db := datastore.GetStore().GetTransaction(ctx) - db = db.Where(Ref{ParentPath: path, AllocationID: allocationID}).Or(Ref{Type: DIRECTORY, Path: path, AllocationID: allocationID}) + t := datastore.GetStore().GetTransaction(ctx) + db := t.Where(Ref{ParentPath: path, AllocationID: allocationID}).Or(Ref{Type: DIRECTORY, Path: path, AllocationID: allocationID}) err := db.Order("path").Find(&refs).Error if err != nil { return nil, err @@ -339,8 +340,8 @@ func GetRefWithChildren(ctx context.Context, allocationID, path string) (*Ref, e func GetRefWithSortedChildren(ctx context.Context, allocationID, path string) (*Ref, error) { var refs []*Ref - db := datastore.GetStore().GetTransaction(ctx) - db = db.Where( + t := datastore.GetStore().GetTransaction(ctx) + db := t.Where( Ref{ParentPath: path, AllocationID: allocationID}). Or(Ref{Type: DIRECTORY, Path: path, AllocationID: allocationID}) diff --git a/code/go/0chain.net/blobbercore/reference/referencepath.go b/code/go/0chain.net/blobbercore/reference/referencepath.go index a9e9e60be..1f5176a1b 100644 --- a/code/go/0chain.net/blobbercore/reference/referencepath.go +++ b/code/go/0chain.net/blobbercore/reference/referencepath.go @@ -25,8 +25,8 @@ func GetReferencePath(ctx context.Context, allocationID, path string) (*Ref, err // GetReferenceForHashCalculationFromPaths validate and build full dir tree from db, and CalculateHash and return root Ref without saving in DB func GetReferenceForHashCalculationFromPaths(ctx context.Context, allocationID string, paths []string) (*Ref, error) { var refs []Ref - db := datastore.GetStore().GetTransaction(ctx) - db = db.Model(&Ref{}).Select("id", "allocation_id", "type", "name", "path", "parent_path", "size", "hash", "file_meta_hash", + t := datastore.GetStore().GetTransaction(ctx) + db := t.Model(&Ref{}).Select("id", "allocation_id", "type", "name", "path", "parent_path", "size", "hash", "file_meta_hash", "path_hash", "validation_root", "fixed_merkle_root", "actual_file_size", "actual_file_hash", "chunk_size", "lookup_hash", "thumbnail_hash", "allocation_root", "level", "created_at", "updated_at", "file_id") @@ -122,7 +122,9 @@ func (rootRef *Ref) GetSrcPath(path string) (*Ref, error) { // GetReferencePathFromPaths validate and build full dir tree from db, and CalculateHash and return root Ref func GetReferencePathFromPaths(ctx context.Context, allocationID string, paths, objTreePath []string) (*Ref, error) { var refs []Ref - db := datastore.GetStore().GetTransaction(ctx) + t := datastore.GetStore().GetTransaction(ctx) + db := t.DB + pathsAdded := make(map[string]bool) var shouldOr bool for _, path := range paths { @@ -199,8 +201,8 @@ func GetReferencePathFromPaths(ctx context.Context, allocationID string, paths, func GetObjectTree(ctx context.Context, allocationID, path string) (*Ref, error) { path = filepath.Clean(path) var refs []Ref - db := datastore.GetStore().GetTransaction(ctx) - db = db.Where(Ref{Path: path, AllocationID: allocationID}) + t := datastore.GetStore().GetTransaction(ctx) + db := t.Where(Ref{Path: path, AllocationID: allocationID}) if path != "/" { db = db.Or("path LIKE ? AND allocation_id = ?", path+"/%", allocationID) } else { diff --git a/code/go/0chain.net/blobbercore/writemarker/entity.go b/code/go/0chain.net/blobbercore/writemarker/entity.go index cbe12e1d0..e99e5d01c 100644 --- a/code/go/0chain.net/blobbercore/writemarker/entity.go +++ b/code/go/0chain.net/blobbercore/writemarker/entity.go @@ -101,7 +101,7 @@ func (wm *WriteMarkerEntity) UpdateStatus(ctx context.Context, status WriteMarke } // work on pre-redeemed tokens and write-pools balances tracking - if err := allocation.AddToPending(db, wm.WM.ClientID, wm.WM.AllocationID, -wm.WM.Size); err != nil { + if err := allocation.AddToPending(ctx, wm.WM.ClientID, wm.WM.AllocationID, -wm.WM.Size); err != nil { return fmt.Errorf("can't save allocation pending value: %v", err) } return diff --git a/code/go/0chain.net/blobbercore/writemarker/mutex.go b/code/go/0chain.net/blobbercore/writemarker/mutex.go index 474fd1ae4..f3d56c296 100644 --- a/code/go/0chain.net/blobbercore/writemarker/mutex.go +++ b/code/go/0chain.net/blobbercore/writemarker/mutex.go @@ -7,8 +7,10 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/errors" "github.com/0chain/gosdk/constants" + "go.uber.org/zap" "gorm.io/gorm" ) @@ -37,6 +39,7 @@ type Mutex struct { // If lock exists and is of same connection ID then lock's createdAt is updated // If lock exists and is of other connection ID then `pending` response is sent. func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string) (*LockResult, error) { + logging.Logger.Info("Locking write marker", zap.String("allocation_id", allocationID), zap.String("connection_id", connectionID)) if allocationID == "" { return nil, errors.Throw(constants.ErrInvalidParameter, "allocationID") } @@ -55,6 +58,7 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string) (*L err := db.Table(TableNameWriteLock).Where("allocation_id=?", allocationID).First(&lock).Error if err != nil { // new lock + logging.Logger.Info("Creating new lock") if errors.Is(err, gorm.ErrRecordNotFound) { lock = WriteLock{ AllocationID: allocationID, @@ -72,7 +76,7 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string) (*L CreatedAt: lock.CreatedAt.Unix(), }, nil } - + logging.Logger.Error("Could not create lock") //native postgres error return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore) } diff --git a/code/go/0chain.net/blobbercore/writemarker/mutext_test.go b/code/go/0chain.net/blobbercore/writemarker/mutext_test.go index 03909b368..5055576ad 100644 --- a/code/go/0chain.net/blobbercore/writemarker/mutext_test.go +++ b/code/go/0chain.net/blobbercore/writemarker/mutext_test.go @@ -8,10 +8,16 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" gomocket "github.com/selvatico/go-mocket" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) +func init() { + logging.Logger = zap.NewNop() +} + func TestMutext_LockShouldWork(t *testing.T) { datastore.UseMocket(false) diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol.go b/code/go/0chain.net/blobbercore/writemarker/protocol.go index b11c82366..e8997d595 100644 --- a/code/go/0chain.net/blobbercore/writemarker/protocol.go +++ b/code/go/0chain.net/blobbercore/writemarker/protocol.go @@ -102,7 +102,7 @@ func (wme *WriteMarkerEntity) VerifyMarker(ctx context.Context, dbAllocation *al return nil } -func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error { +func (wme *WriteMarkerEntity) redeemMarker(ctx context.Context) error { if len(wme.CloseTxnID) > 0 { t, err := transaction.VerifyTransaction(wme.CloseTxnID, chain.GetServerChain()) if err == nil { diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol_integration_tests.go b/code/go/0chain.net/blobbercore/writemarker/protocol_integration_tests.go new file mode 100644 index 000000000..7f284eb49 --- /dev/null +++ b/code/go/0chain.net/blobbercore/writemarker/protocol_integration_tests.go @@ -0,0 +1,29 @@ +//go:build integration_tests +// +build integration_tests + +package writemarker + +import ( + "context" + "time" + + "github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc" + "github.com/0chain/blobber/code/go/0chain.net/core/node" +) + +func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error { + for { + state := conductrpc.Client().State() + if state.StopWMCommit != nil && *state.StopWMCommit { + time.Sleep(time.Second * 5) + continue + } + break + } + err := wme.redeemMarker(ctx) + if err == nil { + // send state to conductor server + conductrpc.Client().BlobberCommitted(node.Self.ID) + } + return err +} diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol_main.go b/code/go/0chain.net/blobbercore/writemarker/protocol_main.go new file mode 100644 index 000000000..77b2ebcbb --- /dev/null +++ b/code/go/0chain.net/blobbercore/writemarker/protocol_main.go @@ -0,0 +1,10 @@ +//go:build !integration_tests +// +build !integration_tests + +package writemarker + +import "context" + +func (wme *WriteMarkerEntity) RedeemMarker(ctx context.Context) error { + return wme.redeemMarker(ctx) +} diff --git a/code/go/0chain.net/blobbercore/writemarker/worker.go b/code/go/0chain.net/blobbercore/writemarker/worker.go index da82e3fa2..ee73ac551 100644 --- a/code/go/0chain.net/blobbercore/writemarker/worker.go +++ b/code/go/0chain.net/blobbercore/writemarker/worker.go @@ -8,11 +8,9 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "go.uber.org/zap" "golang.org/x/sync/semaphore" "gorm.io/gorm" - "gorm.io/gorm/clause" - - "go.uber.org/zap" ) var ( @@ -22,14 +20,14 @@ var ( ) func SetupWorkers(ctx context.Context) { - db := datastore.GetStore().GetDB() - type Res struct { - ID string - } - var res []Res + var res []allocation.Res - err := db.Model(&allocation.Allocation{}).Select("id").Find(&res).Error + err := db.Transaction(func(tx *gorm.DB) error { + c := datastore.GetStore().WithTransaction(ctx, tx) + res = allocation.Repo.GetAllocationIds(c) + return nil + }) if err != nil && err != gorm.ErrRecordNotFound { logging.Logger.Error("error_getting_allocations_worker", zap.Any("error", err)) @@ -72,8 +70,7 @@ func redeemWriteMarker(wm *WriteMarkerEntity) error { } } }() - alloc := &allocation.Allocation{} - err := db.Model(&allocation.Allocation{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("allocation_root").Where("id=?", allocationID).First(alloc).Error + alloc, err := allocation.Repo.GetByIdAndLock(ctx, allocationID) if err != nil { logging.Logger.Error("Error redeeming the write marker.", zap.Any("allocation", allocationID), zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err)) go tryAgain(wm) @@ -110,8 +107,7 @@ func redeemWriteMarker(wm *WriteMarkerEntity) error { mut.Release(1) } - err = db.Exec("UPDATE allocations SET latest_redeemed_write_marker=?,is_redeem_required=? WHERE id=?", - wm.WM.AllocationRoot, false, allocationID).Error + err = allocation.Repo.UpdateAllocationRedeem(ctx, wm.WM.AllocationRoot, allocationID) if err != nil { logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed", diff --git a/code/go/0chain.net/conductor/conductrpc/client.go b/code/go/0chain.net/conductor/conductrpc/client.go index 97db9917a..f21410310 100644 --- a/code/go/0chain.net/conductor/conductrpc/client.go +++ b/code/go/0chain.net/conductor/conductrpc/client.go @@ -47,3 +47,13 @@ func (c *client) state(me NodeID) (state *State, err error) { } return } + +func (c *client) blobberCommitted(blobberID string) (err error) { + err = c.client.Call("Server.BlobberCommitted", blobberID, nil) + return +} + +func (c *client) sendFileMetaRoot(m map[string]string) (err error) { + err = c.client.Call("Server.GetFileMetaRoot", m, nil) + return +} diff --git a/code/go/0chain.net/conductor/conductrpc/entity.go b/code/go/0chain.net/conductor/conductrpc/entity.go index e37473236..13461df47 100644 --- a/code/go/0chain.net/conductor/conductrpc/entity.go +++ b/code/go/0chain.net/conductor/conductrpc/entity.go @@ -1,6 +1,7 @@ package conductrpc import ( + "context" "log" "sync" "sync/atomic" @@ -31,6 +32,10 @@ func (e *Entity) State() (state *State) { // SetState sets current state. func (e *Entity) SetState(state *State) { e.state.Store(state) // update + + if state.GetFileMetaRoot { + go SendFileMetaRoot() + } } // NewEntity creates RPC client for integration tests. @@ -100,6 +105,22 @@ func (e *Entity) isMonitor() bool { //nolint:unused,deadcode // might be used la return state != nil && state.IsMonitor } +func (e *Entity) BlobberCommitted(blobberID string) { + e.client.blobberCommitted(blobberID) +} + +func (e *Entity) SendFileMetaRoot(blobberID, fileMetaRoot string, ctxCncl context.CancelFunc) { + m := map[string]string{ + "blobber_id": blobberID, + "file_meta_root": fileMetaRoot, + } + err := e.client.sendFileMetaRoot(m) + if err != nil { + return + } + ctxCncl() +} + // // global // @@ -120,17 +141,16 @@ func Shutdown() { // Client returns global Entity to interact with. Use it, for example, // -// var state = conductrpc.Client().State() -// for _, minerID := range miners { -// if state.VRFS.IsBad(state, minerID) { -// // send bad VRFS to this miner -// } else if state.VRFS.IsGood(state, minerID) { -// // send good VRFS to this miner -// } else { -// // don't send a VRFS to this miner -// } -// } -// +// var state = conductrpc.Client().State() +// for _, minerID := range miners { +// if state.VRFS.IsBad(state, minerID) { +// // send bad VRFS to this miner +// } else if state.VRFS.IsGood(state, minerID) { +// // send good VRFS to this miner +// } else { +// // don't send a VRFS to this miner +// } +// } func Client() *Entity { return global } diff --git a/code/go/0chain.net/conductor/conductrpc/file_meta.go b/code/go/0chain.net/conductor/conductrpc/file_meta.go new file mode 100644 index 000000000..b203c5ec6 --- /dev/null +++ b/code/go/0chain.net/conductor/conductrpc/file_meta.go @@ -0,0 +1,64 @@ +package conductrpc + +import ( + "context" + + "log" + + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" + + "github.com/0chain/blobber/code/go/0chain.net/core/node" +) + +// alreadyRunning is simple indicator that given function is running +// no need to acquire mutex lock. It does not matter if at a time it +// somehow runs the given function multiple times. Since it takes some +// time to acquire state from rpc server there is no concurrent running +var alreadyRunning bool + +func SendFileMetaRoot() { + if alreadyRunning { + return + } + alreadyRunning = true + defer func() { + alreadyRunning = false + }() + + ctx, ctxCncl := context.WithCancel(context.TODO()) + defer ctxCncl() + + for { + select { + case <-ctx.Done(): + return + default: + } + + s := global.State() + if s.GetFileMetaRoot { + fmr, err := getFileMetaRoot() + if err != nil { + log.Printf("Error: %v", err) + continue + } + + global.SendFileMetaRoot(node.Self.ID, fmr, ctxCncl) + } + } +} + +func getFileMetaRoot() (string, error) { + db := datastore.GetStore().GetDB() + var fmr string + + // It will work fine because this blobber will have only single allocation + // created by conductor + err := db.Raw("SELECT file_meta_root FROM allocations LIMIT 1").Scan(&fmr).Error + + if err != nil { + return "", err + } + + return fmr, nil +} diff --git a/code/go/0chain.net/conductor/conductrpc/state.go b/code/go/0chain.net/conductor/conductrpc/state.go index 13933050b..f503156a5 100644 --- a/code/go/0chain.net/conductor/conductrpc/state.go +++ b/code/go/0chain.net/conductor/conductrpc/state.go @@ -50,37 +50,8 @@ type State struct { // Nodes maps NodeID -> NodeName. Nodes map[NodeID]NodeName - IsMonitor bool // send monitor events (round, phase, etc) - IsLock bool // node locked - IsRevealed bool // revealed shares - // Byzantine state. Below, if a value is nil, then node behaves as usual - // for it. - // - // Byzantine blockchain - VRFS *config.Bad - RoundTimeout *config.Bad - CompetingBlock *config.Bad - SignOnlyCompetingBlocks *config.Bad - DoubleSpendTransaction *config.Bad - WrongBlockSignHash *config.Bad - WrongBlockSignKey *config.Bad - WrongBlockHash *config.Bad - VerificationTicketGroup *config.Bad - WrongVerificationTicketHash *config.Bad - WrongVerificationTicketKey *config.Bad - WrongNotarizedBlockHash *config.Bad - WrongNotarizedBlockKey *config.Bad - NotarizeOnlyCompetingBlock *config.Bad - NotarizedBlock *config.Bad - // Byzantine blockchain sharders - FinalizedBlock *config.Bad - MagicBlock *config.Bad - VerifyTransaction *config.Bad - // Byzantine View Change - MPK *config.Bad - Shares *config.Bad - Signatures *config.Bad - Publish *config.Bad + IsMonitor bool // send monitor events (round, phase, etc) + IsLock bool // node locked // Blobbers related states StorageTree *config.Bad // blobber sends bad files/tree responses @@ -91,6 +62,9 @@ type State struct { BlobberUpload BlobberUpload BlobberDelete BlobberDelete AdversarialValidator AdversarialValidator + StopWMCommit *bool + FailRenameCommit []string + GetFileMetaRoot bool } // Name returns NodeName by given NodeID.