From 3a1a405e5c5c119e69feac5de332a0cc646f23b8 Mon Sep 17 00:00:00 2001 From: Yury Date: Mon, 7 Aug 2023 10:59:34 +0300 Subject: [PATCH] Fix/replace transaction (#1192) * add rb logs * fix rollback wm check * never replace existent transaction * never replace existent transaction * close transaction * revert * fix checkout build-&-publish-docker-image.yml * configuration * configuration * cleaned up transaction logic * cleaned up transaction logic * cleaned up transaction logic * cleaned up transaction logic * fixed tests * fixed tests --------- Co-authored-by: Hitenjain14 Co-authored-by: shahnawaz-creator <117025384+shahnawaz-creator@users.noreply.github.com> --- code/go/0chain.net/blobber/config.go | 39 ++-- code/go/0chain.net/blobber/http.go | 28 +++ code/go/0chain.net/blobber/worker.go | 7 +- .../allocation/allocationchange.go | 203 +++++++++--------- .../blobbercore/allocation/copyfilechange.go | 2 +- .../0chain.net/blobbercore/allocation/dao.go | 24 +-- .../allocation/file_changer_upload.go | 2 +- .../blobbercore/allocation/newdirchange.go | 2 +- .../blobbercore/allocation/workers.go | 30 +-- .../0chain.net/blobbercore/allocation/zcn.go | 16 +- .../automigration/automigration.go | 155 ------------- .../blobbercore/challenge/challenge.go | 1 + .../blobbercore/challenge/protocol.go | 2 +- .../blobbercore/challenge/timing.go | 46 ++-- .../blobbercore/challenge/worker.go | 13 +- .../blobbercore/datastore/mocket.go | 30 ++- .../blobbercore/datastore/postgres.go | 37 +++- .../blobbercore/datastore/sqlmock.go | 30 ++- .../0chain.net/blobbercore/datastore/store.go | 3 +- .../0chain.net/blobbercore/handler/context.go | 61 +++--- .../0chain.net/blobbercore/handler/handler.go | 46 ++-- .../handler/handler_hashnode_test.go | 2 +- .../handler/handler_playlist_test.go | 4 +- .../handler/handler_writemarker_test.go | 4 +- .../handler/object_operation_handler.go | 4 +- .../blobbercore/handler/protocol.go | 5 +- .../blobbercore/readmarker/worker.go | 24 +-- .../blobbercore/reference/ds_test.go | 21 +- .../blobbercore/reference/hashnode.go | 4 +- .../blobbercore/reference/hashnode_test.go | 20 +- .../blobbercore/reference/playlist.go | 55 ++--- .../blobbercore/reference/referencepath.go | 146 +++++++------ .../blobbercore/stats/blobberstats.go | 5 +- .../blobbercore/stats/challengestats.go | 5 +- .../0chain.net/blobbercore/stats/handler.go | 6 - .../blobbercore/writemarker/mutex.go | 4 +- .../blobbercore/writemarker/mutext_test.go | 9 +- .../blobbercore/writemarker/protocol.go | 2 +- .../blobbercore/writemarker/worker.go | 13 +- 39 files changed, 534 insertions(+), 576 deletions(-) delete mode 100644 code/go/0chain.net/blobbercore/automigration/automigration.go diff --git a/code/go/0chain.net/blobber/config.go b/code/go/0chain.net/blobber/config.go index e25e615d5..f530db0af 100644 --- a/code/go/0chain.net/blobber/config.go +++ b/code/go/0chain.net/blobber/config.go @@ -105,29 +105,30 @@ func setupConfig(configDir string, deploymentMode int) { func reloadConfig() error { fmt.Print("> reload config") - db := datastore.GetStore().GetDB() + return datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + s, ok := config.Get(ctx, datastore.GetStore().GetDB()) + if ok { + if err := s.CopyTo(&config.Configuration); err != nil { + return err + } + fmt.Print(" [OK]\n") + return nil + } + + config.Configuration.Capacity = viper.GetInt64("capacity") + + config.Configuration.MinLockDemand = viper.GetFloat64("min_lock_demand") + config.Configuration.NumDelegates = viper.GetInt("num_delegates") + config.Configuration.ReadPrice = viper.GetFloat64("read_price") + config.Configuration.ServiceCharge = viper.GetFloat64("service_charge") + config.Configuration.WritePrice = viper.GetFloat64("write_price") - s, ok := config.Get(context.TODO(), db) - if ok { - if err := s.CopyTo(&config.Configuration); err != nil { + if err := config.Update(ctx, datastore.GetStore().GetDB()); err != nil { return err } + fmt.Print(" [OK]\n") return nil - } - config.Configuration.Capacity = viper.GetInt64("capacity") - - config.Configuration.MinLockDemand = viper.GetFloat64("min_lock_demand") - config.Configuration.NumDelegates = viper.GetInt("num_delegates") - config.Configuration.ReadPrice = viper.GetFloat64("read_price") - config.Configuration.ServiceCharge = viper.GetFloat64("service_charge") - config.Configuration.WritePrice = viper.GetFloat64("write_price") - - if err := config.Update(context.TODO(), db); err != nil { - return err - } - - fmt.Print(" [OK]\n") - return nil + }) } diff --git a/code/go/0chain.net/blobber/http.go b/code/go/0chain.net/blobber/http.go index 230457889..0ec89c658 100644 --- a/code/go/0chain.net/blobber/http.go +++ b/code/go/0chain.net/blobber/http.go @@ -3,6 +3,7 @@ package main import ( "fmt" "net/http" + "net/http/pprof" "runtime" "strconv" "sync" @@ -53,19 +54,38 @@ func startServer(wg *sync.WaitGroup, r *mux.Router, mode string, port int, isTls //address := publicIP + ":" + portString address := ":" + strconv.Itoa(port) var server *http.Server + var profServer *http.Server if config.Development() { // No WriteTimeout setup to enable pprof server = &http.Server{ Addr: address, ReadHeaderTimeout: 30 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 30 * time.Second, MaxHeaderBytes: 1 << 20, Handler: r, } + + pprofMux := http.NewServeMux() + profServer = &http.Server{ + Addr: fmt.Sprintf(":%d", port-1000), + ReadTimeout: 30 * time.Second, + MaxHeaderBytes: 1 << 20, + Handler: pprofMux, + } + initProfHandlers(pprofMux) + go func() { + err2 := profServer.ListenAndServe() + logging.Logger.Error("Http server shut down", zap.Error(err2)) + }() + } else { server = &http.Server{ Addr: address, ReadHeaderTimeout: 30 * time.Second, + ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 30 * time.Second, MaxHeaderBytes: 1 << 20, @@ -91,3 +111,11 @@ func initHandlers(r *mux.Router) { handler.SetupSwagger() common.SetAdminCredentials() } + +func initProfHandlers(mux *http.ServeMux) { + mux.HandleFunc("/debug/pprof/", handler.RateLimitByGeneralRL(pprof.Index)) + mux.HandleFunc("/debug/pprof/cmdline", handler.RateLimitByGeneralRL(pprof.Cmdline)) + mux.HandleFunc("/debug/pprof/profile", handler.RateLimitByGeneralRL(pprof.Profile)) + mux.HandleFunc("/debug/pprof/symbol", handler.RateLimitByGeneralRL(pprof.Symbol)) + mux.HandleFunc("/debug/pprof/trace", handler.RateLimitByGeneralRL(pprof.Trace)) +} diff --git a/code/go/0chain.net/blobber/worker.go b/code/go/0chain.net/blobber/worker.go index 0a502dae1..a5d5aa5c9 100644 --- a/code/go/0chain.net/blobber/worker.go +++ b/code/go/0chain.net/blobber/worker.go @@ -11,7 +11,6 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" - "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "go.uber.org/zap" @@ -30,13 +29,15 @@ func setupWorkers(ctx context.Context) { // startRefreshSettings sync settings from blockchain func startRefreshSettings(ctx context.Context) { const REPEAT_DELAY = 60 * 3 // 3 minutes - var err error for { select { case <-ctx.Done(): return case <-time.After(REPEAT_DELAY * time.Second): - _, err = config.ReloadFromChain(common.GetRootContext(), datastore.GetStore().GetDB()) + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + _, e := config.ReloadFromChain(ctx, datastore.GetStore().GetDB()) + return e + }) if err != nil { logging.Logger.Warn("failed to refresh blobber settings from chain", zap.Error(err)) continue diff --git a/code/go/0chain.net/blobbercore/allocation/allocationchange.go b/code/go/0chain.net/blobbercore/allocation/allocationchange.go index 703009b63..ba39c8a8c 100644 --- a/code/go/0chain.net/blobbercore/allocation/allocationchange.go +++ b/code/go/0chain.net/blobbercore/allocation/allocationchange.go @@ -254,154 +254,147 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) (err er logging.Logger.Info("Move to filestore", zap.String("allocation_id", a.AllocationID)) - tx := datastore.GetStore().GetDB().Begin() - var refs []*Result limitCh := make(chan struct{}, 10) wg := &sync.WaitGroup{} - err = tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE). - FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error { + e := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + err = tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE). + FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error { - for _, ref := range refs { + for _, ref := range refs { - var count int64 - if ref.PrevValidationRoot != "" { - tx.Model(&reference.Ref{}). - Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot). - Count(&count) - } + var count int64 + if ref.PrevValidationRoot != "" { + tx.Model(&reference.Ref{}). + Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot). + Count(&count) + } - limitCh <- struct{}{} - wg.Add(1) + limitCh <- struct{}{} + wg.Add(1) - go func(ref *Result) { - defer func() { - <-limitCh - wg.Done() - }() + go func(ref *Result) { + defer func() { + <-limitCh + wg.Done() + }() - if count == 0 && ref.PrevValidationRoot != "" { - err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot) + if count == 0 && ref.PrevValidationRoot != "" { + err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot) + if err != nil { + logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()), + zap.String("validation_root", ref.ValidationRoot)) + } + } + err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot) if err != nil { - logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()), + logging.Logger.Error(fmt.Sprintf("Error while moving file: %s", err.Error()), zap.String("validation_root", ref.ValidationRoot)) } - } - err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot) - if err != nil { - logging.Logger.Error(fmt.Sprintf("Error while moving file: %s", err.Error()), - zap.String("validation_root", ref.ValidationRoot)) - } - if ref.ThumbnailHash != "" && ref.ThumbnailHash != ref.PrevThumbnailHash { - if ref.PrevThumbnailHash != "" { - err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash) + if ref.ThumbnailHash != "" && ref.ThumbnailHash != ref.PrevThumbnailHash { + if ref.PrevThumbnailHash != "" { + err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash) + if err != nil { + logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail file: %s", err.Error()), + zap.String("thumbnail_hash", ref.ThumbnailHash)) + } + } + err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash) if err != nil { - logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail file: %s", err.Error()), + logging.Logger.Error(fmt.Sprintf("Error while moving thumbnail file: %s", err.Error()), zap.String("thumbnail_hash", ref.ThumbnailHash)) } } - err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash) - if err != nil { - logging.Logger.Error(fmt.Sprintf("Error while moving thumbnail file: %s", err.Error()), - zap.String("thumbnail_hash", ref.ThumbnailHash)) - } - } - - }(ref) - } - return nil - }).Error + }(ref) + } - wg.Wait() + return nil + }).Error - if err != nil { - logging.Logger.Error("Error while moving to filestore", zap.Error(err)) - tx.Rollback() - return err - } + wg.Wait() - err = tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error + if err != nil { + logging.Logger.Error("Error while moving to filestore", zap.Error(err)) + return err + } - if err != nil { - tx.Rollback() - return err + return tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error + }) + if e != nil { + return e } - tx.Commit() + return deleteFromFileStore(ctx, a.AllocationID) } func deleteFromFileStore(ctx context.Context, allocationID string) error { - - db := datastore.GetStore().GetDB().Begin() limitCh := make(chan struct{}, 10) wg := &sync.WaitGroup{} var results []Result - err := db.Model(&reference.Ref{}).Unscoped().Select("id", "validation_root", "thumbnail_hash"). - Where("allocation_id=? AND is_precommit=? AND type=? AND deleted_at is not NULL", allocationID, true, reference.FILE). - FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { + return datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + db := datastore.GetStore().GetTransaction(ctx) - for _, res := range results { - var count int64 - tx.Model(&reference.Ref{}). - Where("allocation_id=? AND validation_root=?", allocationID, res.ValidationRoot). - Count(&count) + err := db.Model(&reference.Ref{}).Unscoped().Select("id", "validation_root", "thumbnail_hash"). + Where("allocation_id=? AND is_precommit=? AND type=? AND deleted_at is not NULL", allocationID, true, reference.FILE). + FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { - if count != 0 && res.ThumbnailHash == "" { - continue - } + for _, res := range results { + var count int64 + tx.Model(&reference.Ref{}). + Where("allocation_id=? AND validation_root=?", allocationID, res.ValidationRoot). + Count(&count) + + if count != 0 && res.ThumbnailHash == "" { + continue + } - limitCh <- struct{}{} - wg.Add(1) + limitCh <- struct{}{} + wg.Add(1) - go func(res Result, count int64) { - defer func() { - <-limitCh - wg.Done() - }() + go func(res Result, count int64) { + defer func() { + <-limitCh + wg.Done() + }() - if count == 0 { - err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot) - if err != nil { - logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()), - zap.String("validation_root", res.ValidationRoot)) + if count == 0 { + err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot) + if err != nil { + logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()), + zap.String("validation_root", res.ValidationRoot)) + } } - } - if res.ThumbnailHash != "" { - err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash) - if err != nil { - logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()), - zap.String("thumbnail", res.ThumbnailHash)) + if res.ThumbnailHash != "" { + err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash) + if err != nil { + logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()), + zap.String("thumbnail", res.ThumbnailHash)) + } } - } - }(res, count) + }(res, count) - } - return nil - }).Error + } + return nil + }).Error - wg.Wait() - if err != nil && err != gorm.ErrRecordNotFound { - logging.Logger.Error("DeleteFromFileStore", zap.Error(err)) - db.Rollback() - return err - } + wg.Wait() + if err != nil && err != gorm.ErrRecordNotFound { + logging.Logger.Error("DeleteFromFileStore", zap.Error(err)) + return err + } - err = db.Model(&reference.Ref{}).Unscoped(). - Delete(&reference.Ref{}, - "allocation_id = ? AND deleted_at IS NOT NULL", - allocationID).Error - if err != nil { - db.Rollback() - return err - } - db.Commit() - return nil + return db.Model(&reference.Ref{}).Unscoped(). + Delete(&reference.Ref{}, + "allocation_id = ? AND deleted_at IS NOT NULL", + allocationID).Error + }) } // Note: We are also fetching refPath for srcPath in copy operation diff --git a/code/go/0chain.net/blobbercore/allocation/copyfilechange.go b/code/go/0chain.net/blobbercore/allocation/copyfilechange.go index 2369e7eee..44a8455cf 100644 --- a/code/go/0chain.net/blobbercore/allocation/copyfilechange.go +++ b/code/go/0chain.net/blobbercore/allocation/copyfilechange.go @@ -27,7 +27,7 @@ func (rf *CopyFileChange) DeleteTempFile() error { func (rf *CopyFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) { - totalRefs, err := reference.CountRefs(rf.AllocationID) + totalRefs, err := reference.CountRefs(ctx, rf.AllocationID) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/allocation/dao.go b/code/go/0chain.net/blobbercore/allocation/dao.go index edeea5ed1..a62005e4f 100644 --- a/code/go/0chain.net/blobbercore/allocation/dao.go +++ b/code/go/0chain.net/blobbercore/allocation/dao.go @@ -3,7 +3,6 @@ package allocation import ( "context" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/errors" "github.com/0chain/gosdk/constants" @@ -11,15 +10,13 @@ import ( ) // GetOrCreate, get allocation if it exists in db. if not, try to sync it from blockchain, and insert it in db. -func GetOrCreate(ctx context.Context, store datastore.Store, allocationId string) (*Allocation, error) { - - db := store.CreateTransaction(ctx) - +func GetOrCreate(ctx context.Context, allocationId string) (*Allocation, error) { if len(allocationId) == 0 { return nil, errors.Throw(constants.ErrInvalidParameter, "tx") } - alloc, err := Repo.GetById(db, allocationId) + alloc, err := Repo.GetById(ctx, allocationId) + if err == nil { return alloc, nil } @@ -31,18 +28,3 @@ func GetOrCreate(ctx context.Context, store datastore.Store, allocationId string return SyncAllocation(allocationId) } - -// DryRun Creates a prepared statement when executing any SQL and caches them to speed up future calls -// https://gorm.io/docs/performance.html#Caches-Prepared-Statement -func DryRun(db *gorm.DB) { - - // https://gorm.io/docs/session.html#DryRun - // Session mode - tx := db.Session(&gorm.Session{PrepareStmt: true, DryRun: true}) - - // use Table instead of Model to reduce reflect times - - // prepare statement for GetOrCreate - tx.Table(TableNameAllocation).Where(SQLWhereGetByTx, "tx").First(&Allocation{}) - -} diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_upload.go b/code/go/0chain.net/blobbercore/allocation/file_changer_upload.go index c45bb33cf..fe35fb618 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_upload.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_upload.go @@ -24,7 +24,7 @@ type UploadFileChanger struct { func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) { - totalRefs, err := reference.CountRefs(nf.AllocationID) + totalRefs, err := reference.CountRefs(ctx, nf.AllocationID) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/allocation/newdirchange.go b/code/go/0chain.net/blobbercore/allocation/newdirchange.go index 9fa625a53..988ee4d27 100644 --- a/code/go/0chain.net/blobbercore/allocation/newdirchange.go +++ b/code/go/0chain.net/blobbercore/allocation/newdirchange.go @@ -23,7 +23,7 @@ type NewDir struct { func (nf *NewDir) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange, allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) { - totalRefs, err := reference.CountRefs(nf.AllocationID) + totalRefs, err := reference.CountRefs(ctx, nf.AllocationID) if err != nil { return nil, err } diff --git a/code/go/0chain.net/blobbercore/allocation/workers.go b/code/go/0chain.net/blobbercore/allocation/workers.go index 952c35a7c..44306b911 100644 --- a/code/go/0chain.net/blobbercore/allocation/workers.go +++ b/code/go/0chain.net/blobbercore/allocation/workers.go @@ -13,8 +13,6 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/blobber/code/go/0chain.net/core/transaction" - "gorm.io/gorm" - "go.uber.org/zap" ) @@ -46,7 +44,10 @@ func UpdateWorker(ctx context.Context, interval time.Duration) { for { select { case <-tick: - updateWork(ctx) + _ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + updateWork(ctx) + return nil + }) case <-quit: return } @@ -110,12 +111,6 @@ func updateWork(ctx context.Context) { // not finalized, not cleaned up func findAllocations(ctx context.Context, offset int64) (allocs []*Allocation, count int, err error) { - - ctx = datastore.GetStore().CreateTransaction(ctx) - - var tx = datastore.GetStore().GetTransaction(ctx) - defer tx.Rollback() - allocations, err := Repo.GetAllocations(ctx, offset) return allocations, len(allocations), err } @@ -172,20 +167,8 @@ func requestAllocation(allocID string) (sa *transaction.StorageAllocation, err e return } -func commit(tx *gorm.DB, err *error) { - if (*err) != nil { - tx.Rollback() - return - } - (*err) = tx.Commit().Error -} - func updateAllocationInDB(ctx context.Context, a *Allocation, sa *transaction.StorageAllocation) (ua *Allocation, err error) { - ctx = datastore.GetStore().CreateTransaction(ctx) - var tx = datastore.GetStore().GetTransaction(ctx) - defer commit(tx.DB, &err) - var changed bool = a.Tx != sa.Tx // transaction @@ -262,9 +245,7 @@ func cleanupAllocation(ctx context.Context, a *Allocation) { logging.Logger.Error("cleaning finalized allocation", zap.Error(err)) } - ctx = datastore.GetStore().CreateTransaction(ctx) var tx = datastore.GetStore().GetTransaction(ctx) - defer commit(tx.DB, &err) a.CleanedUp = true if err = tx.Model(a).Updates(a).Error; err != nil { @@ -273,10 +254,7 @@ func cleanupAllocation(ctx context.Context, a *Allocation) { } func deleteAllocation(ctx context.Context, a *Allocation) (err error) { - ctx = datastore.GetStore().CreateTransaction(ctx) var tx = datastore.GetStore().GetTransaction(ctx) - defer commit(tx.DB, &err) - filestore.GetFileStore().DeleteAllocation(a.ID) err = tx.Model(&reference.Ref{}).Unscoped(). Delete(&reference.Ref{}, diff --git a/code/go/0chain.net/blobbercore/allocation/zcn.go b/code/go/0chain.net/blobbercore/allocation/zcn.go index 3ff8ac26b..bc7b0c463 100644 --- a/code/go/0chain.net/blobbercore/allocation/zcn.go +++ b/code/go/0chain.net/blobbercore/allocation/zcn.go @@ -10,7 +10,6 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/core/node" "github.com/0chain/errors" "go.uber.org/zap" - "gorm.io/gorm" ) // SyncAllocation try to pull allocation from blockchain, and insert it in db. @@ -65,19 +64,18 @@ func SyncAllocation(allocationId string) (*Allocation, error) { }) } - err = datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { - ctx := datastore.GetStore().WithTransaction(context.Background(), tx) - if err := Repo.Save(ctx, alloc); err != nil { - return err + err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + var e error + if e := Repo.Save(ctx, alloc); e != nil { + return e } - + tx := datastore.GetStore().GetTransaction(ctx) for _, term := range terms { if err := tx.Table(TableNameTerms).Save(term).Error; err != nil { - return err + return e } } - - return nil + return e }) if err != nil { diff --git a/code/go/0chain.net/blobbercore/automigration/automigration.go b/code/go/0chain.net/blobbercore/automigration/automigration.go deleted file mode 100644 index b31631c11..000000000 --- a/code/go/0chain.net/blobbercore/automigration/automigration.go +++ /dev/null @@ -1,155 +0,0 @@ -//This file is used to create table schemas using gorm's automigration feature which takes information from -//struct's fields and functions - -package automigration - -import ( - "fmt" - - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/challenge" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" - "gorm.io/gorm" -) - -type tableNameI interface { - TableName() string -} - -var tableModels = []tableNameI{ - new(reference.Ref), - new(reference.ShareInfo), - new(challenge.ChallengeEntity), - new(challenge.ChallengeTiming), - new(allocation.Allocation), - new(allocation.AllocationChange), - new(allocation.AllocationChangeCollector), - new(allocation.Pending), - new(allocation.Terms), - new(allocation.ReadPool), - new(allocation.WritePool), - new(readmarker.ReadMarkerEntity), - new(writemarker.WriteMarkerEntity), - new(writemarker.WriteLock), - new(reference.FileStats), - new(config.Settings), -} - -func AutoMigrate(pgDB *gorm.DB) error { - if err := createUser(pgDB); err != nil { - return err - } - - if err := createDB(pgDB); err != nil { - return err - } - - if err := grantPrivileges(pgDB); err != nil { - return err - } - - d, err := pgDB.DB() - if err != nil { - return err - } - - if err := d.Close(); err != nil { - return err - } - - if err := datastore.GetStore().Open(); err != nil { - return err - } - - db := datastore.GetStore().GetDB() - return MigrateSchema(db) -} - -func createDB(db *gorm.DB) (err error) { - // check if db exists - dbstmt := fmt.Sprintf("SELECT datname, oid FROM pg_database WHERE datname = '%s';", config.Configuration.DBName) - rs := db.Raw(dbstmt) - if rs.Error != nil { - return rs.Error - } - - var result struct { - Datname string - } - - if rs.Scan(&result); len(result.Datname) == 0 { - stmt := fmt.Sprintf("CREATE DATABASE %s;", config.Configuration.DBName) - if rs := db.Exec(stmt); rs.Error != nil { - return rs.Error - } - if rs := db.Exec("CREATE EXTENSION IF NOT EXISTS pg_trgm;"); rs.Error != nil { - return rs.Error - } - } - return -} - -func createUser(db *gorm.DB) error { - usrstmt := fmt.Sprintf("SELECT usename, usesysid FROM pg_catalog.pg_user WHERE usename = '%s';", config.Configuration.DBUserName) - rs := db.Raw(usrstmt) - if rs.Error != nil { - return rs.Error - } - - var result struct { - Usename string - } - - if rs.Scan(&result); len(result.Usename) == 0 { - stmt := fmt.Sprintf("CREATE USER %s WITH ENCRYPTED PASSWORD '%s';", config.Configuration.DBUserName, config.Configuration.DBPassword) - if rs := db.Exec(stmt); rs.Error != nil && rs.Error.Error() != fmt.Sprintf("pq: role \"%s\" already exists", config.Configuration.DBUserName) { - return rs.Error - } - } - return nil -} - -func grantPrivileges(db *gorm.DB) error { - stmts := []string{ - fmt.Sprintf("GRANT ALL PRIVILEGES ON DATABASE %s TO %s;", config.Configuration.DBName, config.Configuration.DBUserName), - fmt.Sprintf("GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO %s;", config.Configuration.DBUserName), - fmt.Sprintf("GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO %s;", config.Configuration.DBUserName), - } - for _, stmt := range stmts { - err := db.Exec(stmt).Error - if err != nil { - return err - } - } - return nil -} - -func MigrateSchema(db *gorm.DB) error { - var tables []interface{} // Put in new slice to resolve type mismatch - for _, tbl := range tableModels { - tables = append(tables, tbl) - } - - if err := db.AutoMigrate(tables...); err != nil { - return err - } - err := db.Exec(`ALTER TABLE reference_objects ALTER COLUMN path TYPE varchar(1000) COLLATE "POSIX"`).Error - if err != nil { - return err - } - return nil -} - -// DropSchemas is used for integration tests to clear DB. -func DropSchemas(db *gorm.DB) error { - var tables []interface{} // Put in new slice to resolve type mismatch - for _, tbl := range tableModels { - tables = append(tables, tbl) - } - - return db.Migrator().DropTable(tables...) -} diff --git a/code/go/0chain.net/blobbercore/challenge/challenge.go b/code/go/0chain.net/blobbercore/challenge/challenge.go index 4bdea9cfb..a1194af2a 100644 --- a/code/go/0chain.net/blobbercore/challenge/challenge.go +++ b/code/go/0chain.net/blobbercore/challenge/challenge.go @@ -125,6 +125,7 @@ func validateOnValidators(c *ChallengeEntity) { zap.Error(err)) deleteChallenge(c.RoundCreatedAt) tx.Rollback() + return } createdTime := common.ToTime(c.CreatedAt) diff --git a/code/go/0chain.net/blobbercore/challenge/protocol.go b/code/go/0chain.net/blobbercore/challenge/protocol.go index 67a853300..b50554ab4 100644 --- a/code/go/0chain.net/blobbercore/challenge/protocol.go +++ b/code/go/0chain.net/blobbercore/challenge/protocol.go @@ -82,7 +82,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { allocationObj, err := allocation.GetAllocationByID(ctx, cr.AllocationID) if err != nil { allocMu.RUnlock() - cr.CancelChallenge(ctx, ErrNoValidator) + cr.CancelChallenge(ctx, err) return err } diff --git a/code/go/0chain.net/blobbercore/challenge/timing.go b/code/go/0chain.net/blobbercore/challenge/timing.go index 938e237f7..ca7a78c20 100644 --- a/code/go/0chain.net/blobbercore/challenge/timing.go +++ b/code/go/0chain.net/blobbercore/challenge/timing.go @@ -1,6 +1,7 @@ package challenge import ( + "context" "fmt" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" @@ -61,7 +62,8 @@ func CreateChallengeTiming(challengeID string, createdAt common.Timestamp) error CreatedAtChain: createdAt, } - err := datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) return tx.Create(c).Error }) @@ -73,7 +75,8 @@ func UpdateChallengeTimingCancellation(challengeID string, cancellation common.T ChallengeID: challengeID, } - err := datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) values := map[string]interface{}{ "closed_at": cancellation, } @@ -92,8 +95,8 @@ func UpdateChallengeTimingCompleteValidation(challengeID string, completeValidat c := &ChallengeTiming{ ChallengeID: challengeID, } - - err := datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) return tx.Model(&c).Update("complete_validation", completeValidation).Error }) @@ -111,7 +114,8 @@ func UpdateChallengeTimingProofGenerationAndFileSize( ChallengeID: challengeID, } - err := datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) values := map[string]interface{}{ "proof_gen_time": proofGenTime, "file_size": size, @@ -127,7 +131,8 @@ func UpdateChallengeTimingTxnSubmission(challengeID string, txnSubmission common ChallengeID: challengeID, } - err := datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) return tx.Model(&c).Update("txn_submission", txnSubmission).Error }) @@ -139,7 +144,8 @@ func UpdateChallengeTimingTxnVerification(challengeID string, txnVerification co ChallengeID: challengeID, } - err := datastore.GetStore().GetDB().Transaction(func(tx *gorm.DB) error { + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) values := map[string]interface{}{ "txn_verification": txnVerification, "closed_at": txnVerification, @@ -152,18 +158,21 @@ func UpdateChallengeTimingTxnVerification(challengeID string, txnVerification co } func GetChallengeTimings(from common.Timestamp, limit common.Pagination) ([]*ChallengeTiming, error) { - query := datastore.GetStore().GetDB().Model(&ChallengeTiming{}). - Where("closed_at > ?", from).Limit(limit.Limit).Offset(limit.Offset).Order(clause.OrderByColumn{ - Column: clause.Column{Name: "closed_at"}, - Desc: limit.IsDescending, - }) - var chs []*ChallengeTiming + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + query := tx.Model(&ChallengeTiming{}). + Where("closed_at > ?", from).Limit(limit.Limit).Offset(limit.Offset).Order(clause.OrderByColumn{ + Column: clause.Column{Name: "closed_at"}, + Desc: limit.IsDescending, + }) + + return query.Find(&chs).Error + }) - result := query.Find(&chs) - if result.Error != nil { + if err != nil { return nil, fmt.Errorf("error retrieving updated challenge timings with %v; error: %v", - from, result.Error) + from, err) } return chs, nil } @@ -171,6 +180,9 @@ func GetChallengeTimings(from common.Timestamp, limit common.Pagination) ([]*Cha func GetChallengeTiming(challengeID string) (*ChallengeTiming, error) { var ch *ChallengeTiming - err := datastore.GetStore().GetDB().Model(&ChallengeTiming{}).Where("challenge_id = ?", challengeID).First(&ch).Error + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + return tx.Model(&ChallengeTiming{}).Where("challenge_id = ?", challengeID).First(&ch).Error + }) return ch, err } diff --git a/code/go/0chain.net/blobbercore/challenge/worker.go b/code/go/0chain.net/blobbercore/challenge/worker.go index 509500819..bb2840aa3 100644 --- a/code/go/0chain.net/blobbercore/challenge/worker.go +++ b/code/go/0chain.net/blobbercore/challenge/worker.go @@ -84,9 +84,15 @@ func challengeProcessor(ctx context.Context) { case it := <-toProcessChallenge: logging.Logger.Info("processing_challenge", zap.Any("challenge_id", it.ChallengeID)) - if ok := it.createChallenge(); !ok { + var result bool + _ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + result = it.createChallenge(ctx) + return nil + }) + if !result { continue } + err := sem.Acquire(ctx, 1) if err != nil { logging.Logger.Error("failed to acquire semaphore", zap.Error(err)) @@ -176,13 +182,14 @@ func getBatch(batchSize int) (chall []ChallengeEntity) { return } -func (it *ChallengeEntity) createChallenge() bool { +func (it *ChallengeEntity) createChallenge(ctx context.Context) bool { + db := datastore.GetStore().GetTransaction(ctx) + challengeMapLock.Lock() defer challengeMapLock.Unlock() if _, ok := challengeMap.Get(it.RoundCreatedAt); ok { return false } - db := datastore.GetStore().GetDB() var Found bool err := db.Raw("SELECT EXISTS(SELECT 1 FROM challenge_timing WHERE challenge_id = ?) AS found", it.ChallengeID).Scan(&Found).Error if err != nil { diff --git a/code/go/0chain.net/blobbercore/datastore/mocket.go b/code/go/0chain.net/blobbercore/datastore/mocket.go index de975d32f..e5f0ef47d 100644 --- a/code/go/0chain.net/blobbercore/datastore/mocket.go +++ b/code/go/0chain.net/blobbercore/datastore/mocket.go @@ -85,8 +85,34 @@ func (store *Mocket) GetTransaction(ctx context.Context) *EnhancedDB { return nil } -func (store *Mocket) WithTransaction(ctx context.Context, tx *gorm.DB) context.Context { - return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(tx)) +func (store *Mocket) WithNewTransaction(f func(ctx context.Context) error) error { + ctx := store.CreateTransaction(context.TODO()) + defer ctx.Done() + + tx := store.GetTransaction(ctx) + err := f(ctx) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil +} + +func (store *Mocket) WithTransaction(ctx context.Context, f func(ctx context.Context) error) error { + tx := store.GetTransaction(ctx) + if tx == nil { + ctx = store.CreateTransaction(ctx) + tx = store.GetTransaction(ctx) + } + + err := f(ctx) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil } func (store *Mocket) GetDB() *gorm.DB { diff --git a/code/go/0chain.net/blobbercore/datastore/postgres.go b/code/go/0chain.net/blobbercore/datastore/postgres.go index 0d237aaac..2bfcde8e3 100644 --- a/code/go/0chain.net/blobbercore/datastore/postgres.go +++ b/code/go/0chain.net/blobbercore/datastore/postgres.go @@ -68,7 +68,8 @@ func (store *postgresStore) Open() error { sqldb.SetMaxIdleConns(100) sqldb.SetMaxOpenConns(200) - sqldb.SetConnMaxLifetime(30 * time.Second) + sqldb.SetConnMaxLifetime(60 * time.Second) + sqldb.SetConnMaxIdleTime(60 * time.Second) // Enable Logger, show detailed log //db.LogMode(true) store.db = db @@ -84,6 +85,11 @@ func (store *postgresStore) Close() { } func (store *postgresStore) CreateTransaction(ctx context.Context) context.Context { + //conn := ctx.Value(ContextKeyTransaction) + //if conn != nil { + // return ctx + //} + db := store.db.Begin() return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(db)) } @@ -97,8 +103,33 @@ func (store *postgresStore) GetTransaction(ctx context.Context) *EnhancedDB { return nil } -func (store *postgresStore) WithTransaction(ctx context.Context, tx *gorm.DB) context.Context { - return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(tx)) +func (store *postgresStore) WithNewTransaction(f func(ctx context.Context) error) error { + ctx := store.CreateTransaction(context.TODO()) + defer ctx.Done() + + tx := store.GetTransaction(ctx) + err := f(ctx) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil +} +func (store *postgresStore) WithTransaction(ctx context.Context, f func(ctx context.Context) error) error { + tx := store.GetTransaction(ctx) + if tx == nil { + ctx = store.CreateTransaction(ctx) + tx = store.GetTransaction(ctx) + } + + err := f(ctx) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil } func (store *postgresStore) GetDB() *gorm.DB { diff --git a/code/go/0chain.net/blobbercore/datastore/sqlmock.go b/code/go/0chain.net/blobbercore/datastore/sqlmock.go index f0fdc6a3f..b528ec73b 100644 --- a/code/go/0chain.net/blobbercore/datastore/sqlmock.go +++ b/code/go/0chain.net/blobbercore/datastore/sqlmock.go @@ -80,8 +80,34 @@ func (store *Sqlmock) GetTransaction(ctx context.Context) *EnhancedDB { return nil } -func (store *Sqlmock) WithTransaction(ctx context.Context, tx *gorm.DB) context.Context { - return context.WithValue(ctx, ContextKeyTransaction, EnhanceDB(tx)) +func (store *Sqlmock) WithNewTransaction(f func(ctx context.Context) error) error { + ctx := store.CreateTransaction(context.TODO()) + defer ctx.Done() + + tx := store.GetTransaction(ctx) + err := f(ctx) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil +} + +func (store *Sqlmock) WithTransaction(ctx context.Context, f func(ctx context.Context) error) error { + tx := store.GetTransaction(ctx) + if tx == nil { + ctx = store.CreateTransaction(ctx) + tx = store.GetTransaction(ctx) + } + + err := f(ctx) + if err != nil { + tx.Rollback() + return err + } + tx.Commit() + return nil } func (store *Sqlmock) GetDB() *gorm.DB { diff --git a/code/go/0chain.net/blobbercore/datastore/store.go b/code/go/0chain.net/blobbercore/datastore/store.go index e24ca8804..866ace2d4 100644 --- a/code/go/0chain.net/blobbercore/datastore/store.go +++ b/code/go/0chain.net/blobbercore/datastore/store.go @@ -31,7 +31,8 @@ type Store interface { CreateTransaction(ctx context.Context) context.Context // GetTransaction get transaction from context GetTransaction(ctx context.Context) *EnhancedDB - WithTransaction(ctx context.Context, tx *gorm.DB) context.Context + WithNewTransaction(f func(ctx context.Context) error) error + WithTransaction(ctx context.Context, f func(ctx context.Context) error) error // Get db connection with user that creates roles and databases. Its dialactor does not contain database name GetPgDB() (*gorm.DB, error) Open() error diff --git a/code/go/0chain.net/blobbercore/handler/context.go b/code/go/0chain.net/blobbercore/handler/context.go index cf8f81e89..10c1d0ba3 100644 --- a/code/go/0chain.net/blobbercore/handler/context.go +++ b/code/go/0chain.net/blobbercore/handler/context.go @@ -130,8 +130,8 @@ type ErrorResponse struct { Error string } -// WithHandler process handler to respond request -func WithHandler(handler func(ctx *Context) (interface{}, error)) func(w http.ResponseWriter, r *http.Request) { +// WithTxHandler process handler to respond request +func WithTxHandler(handler func(ctx *Context) (interface{}, error)) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") // CORS for all. w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS, PUT, DELETE") @@ -143,23 +143,39 @@ func WithHandler(handler func(ctx *Context) (interface{}, error)) func(w http.Re } common.TryParseForm(r) - w.Header().Set("Content-Type", "application/json") - ctx, err := WithVerify(r) - statusCode := ctx.StatusCode + statusCode := 0 + var result interface{} + err := datastore.GetStore().WithNewTransaction(func(c context.Context) error { + ctx := &Context{ + Context: c, + Request: r, + Store: datastore.GetStore(), + } - if err != nil { - if statusCode == 0 { - statusCode = http.StatusInternalServerError + ctx.Vars = mux.Vars(r) + if ctx.Vars == nil { + ctx.Vars = make(map[string]string) } - http.Error(w, err.Error(), statusCode) - return - } + ctx.ClientID = r.Header.Get(common.ClientHeader) + ctx.ClientKey = r.Header.Get(common.ClientKeyHeader) + ctx.AllocationId = r.Header.Get(common.AllocationIdHeader) + ctx.Signature = r.Header.Get(common.ClientSignatureHeader) - result, err := handler(ctx) - statusCode = ctx.StatusCode + ctx, err := WithVerify(ctx, r) + statusCode = ctx.StatusCode + + if err != nil { + return err + } + + result, err = handler(ctx) + statusCode = ctx.StatusCode + + return err + }) if err != nil { if statusCode == 0 { @@ -183,28 +199,13 @@ func WithHandler(handler func(ctx *Context) (interface{}, error)) func(w http.Re } // WithVerify verify allocation and signature -func WithVerify(r *http.Request) (*Context, error) { - - ctx := &Context{ - Context: context.TODO(), - Request: r, - Store: datastore.GetStore(), - } - - ctx.Vars = mux.Vars(r) - if ctx.Vars == nil { - ctx.Vars = make(map[string]string) - } +func WithVerify(ctx *Context, r *http.Request) (*Context, error) { - ctx.ClientID = r.Header.Get(common.ClientHeader) - ctx.ClientKey = r.Header.Get(common.ClientKeyHeader) - ctx.AllocationId = r.Header.Get(common.AllocationIdHeader) - ctx.Signature = r.Header.Get(common.ClientSignatureHeader) allocationTx := ctx.Vars["allocation"] if len(ctx.AllocationId) > 0 { - alloc, err := allocation.GetOrCreate(ctx, ctx.Store, ctx.AllocationId) + alloc, err := allocation.GetOrCreate(ctx, ctx.AllocationId) if err != nil { if errors.Is(common.ErrBadRequest, err) { diff --git a/code/go/0chain.net/blobbercore/handler/handler.go b/code/go/0chain.net/blobbercore/handler/handler.go index 7e083f0cd..7ad981eec 100644 --- a/code/go/0chain.net/blobbercore/handler/handler.go +++ b/code/go/0chain.net/blobbercore/handler/handler.go @@ -226,7 +226,7 @@ func setupHandlers(r *mux.Router) { // r.HandleFunc("/_cleanupdisk", common.AuthenticateAdmin(common.ToJSONResponse(WithReadOnlyConnection(CleanupDiskHandler)))) r.HandleFunc("/_cleanupdisk", RateLimitByCommmitRL(common.ToJSONResponse(WithReadOnlyConnection(CleanupDiskHandler)))) // r.HandleFunc("/getstats", common.AuthenticateAdmin(common.ToJSONResponse(stats.GetStatsHandler))) - r.HandleFunc("/getstats", RateLimitByCommmitRL(common.ToJSONResponse(stats.GetStatsHandler))) + r.HandleFunc("/getstats", RateLimitByCommmitRL(common.ToJSONResponse(WithReadOnlyConnection(stats.GetStatsHandler)))) // r.HandleFunc("/challengetimings", common.AuthenticateAdmin(common.ToJSONResponse(GetChallengeTimings))) r.HandleFunc("/challengetimings", RateLimitByCommmitRL(common.ToJSONResponse(GetChallengeTimings))) r.HandleFunc("/challenge-timings-by-challengeId", RateLimitByCommmitRL(common.ToJSONResponse(GetChallengeTiming))) @@ -248,23 +248,23 @@ func setupHandlers(r *mux.Router) { // lightweight http handler without heavy postgres transaction to improve performance r.HandleFunc("/v1/writemarker/lock/{allocation}", - RateLimitByGeneralRL(WithHandler(LockWriteMarker))). + RateLimitByGeneralRL(WithTxHandler(LockWriteMarker))). Methods(http.MethodPost, http.MethodOptions) r.HandleFunc("/v1/writemarker/lock/{allocation}/{connection}", - RateLimitByGeneralRL(WithHandler(UnlockWriteMarker))). + RateLimitByGeneralRL(WithTxHandler(UnlockWriteMarker))). Methods(http.MethodDelete, http.MethodOptions) r.HandleFunc("/v1/hashnode/root/{allocation}", - RateLimitByObjectRL(WithHandler(LoadRootHashnode))). + RateLimitByObjectRL(WithTxHandler(LoadRootHashnode))). Methods(http.MethodGet, http.MethodOptions) r.HandleFunc("/v1/playlist/latest/{allocation}", - RateLimitByGeneralRL(WithHandler(LoadPlaylist))). + RateLimitByGeneralRL(WithTxHandler(LoadPlaylist))). Methods(http.MethodGet, http.MethodOptions) r.HandleFunc("/v1/playlist/file/{allocation}", - RateLimitByGeneralRL(WithHandler(LoadPlaylistFile))). + RateLimitByGeneralRL(WithTxHandler(LoadPlaylistFile))). Methods(http.MethodGet, http.MethodOptions) } @@ -281,31 +281,18 @@ func WithReadOnlyConnection(handler common.JSONResponderF) common.JSONResponderF } func WithConnection(handler common.JSONResponderF) common.JSONResponderF { - return func(ctx context.Context, r *http.Request) (resp interface{}, err error) { - ctx = GetMetaDataStore().CreateTransaction(ctx) - - resp, err = handler(ctx, r) + return func(ctx context.Context, r *http.Request) (interface{}, error) { + var ( + resp interface{} + err error + ) + err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + resp, err = handler(ctx, r) - defer func() { - if err != nil { - var rollErr = GetMetaDataStore().GetTransaction(ctx). - Rollback().Error - if rollErr != nil { - Logger.Error("couldn't rollback", zap.Error(err)) - } - } - }() + return err + }) - if err != nil { - Logger.Error("Error in handling the request." + err.Error()) - return - } - err = GetMetaDataStore().GetTransaction(ctx).Commit().Error - if err != nil { - return resp, common.NewErrorf("commit_error", - "error committing to meta store: %v", err) - } - return + return resp, err } } @@ -720,6 +707,7 @@ func writeResponse(w http.ResponseWriter, resp []byte) { } } +// todo wrap with connection func StatsHandler(w http.ResponseWriter, r *http.Request) { isJSON := r.Header.Get("Accept") == "application/json" diff --git a/code/go/0chain.net/blobbercore/handler/handler_hashnode_test.go b/code/go/0chain.net/blobbercore/handler/handler_hashnode_test.go index c6b7d0279..a44204ad5 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_hashnode_test.go +++ b/code/go/0chain.net/blobbercore/handler/handler_hashnode_test.go @@ -85,7 +85,7 @@ FROM reference_objects`). } rr := httptest.NewRecorder() - handler := http.HandlerFunc(WithHandler(func(ctx *Context) (interface{}, error) { + handler := http.HandlerFunc(WithTxHandler(func(ctx *Context) (interface{}, error) { ctx.AllocationId = "allocation_handler_load_root" return LoadRootHashnode(ctx) })) diff --git a/code/go/0chain.net/blobbercore/handler/handler_playlist_test.go b/code/go/0chain.net/blobbercore/handler/handler_playlist_test.go index adcb4700c..f79bffb27 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_playlist_test.go +++ b/code/go/0chain.net/blobbercore/handler/handler_playlist_test.go @@ -53,7 +53,7 @@ func TestPlaylist_LoadPlaylist(t *testing.T) { } rr := httptest.NewRecorder() - handler := http.HandlerFunc(WithHandler(func(ctx *Context) (interface{}, error) { + handler := http.HandlerFunc(WithTxHandler(func(ctx *Context) (interface{}, error) { ctx.AllocationId = "AllocationId" ctx.ClientID = "ownerid" ctx.Allocation = &allocation.Allocation{ @@ -123,7 +123,7 @@ func TestPlaylist_LoadPlaylistFile(t *testing.T) { } rr := httptest.NewRecorder() - handler := http.HandlerFunc(WithHandler(func(ctx *Context) (interface{}, error) { + handler := http.HandlerFunc(WithTxHandler(func(ctx *Context) (interface{}, error) { ctx.AllocationId = "AllocationId" ctx.ClientID = "ownerid" ctx.Allocation = &allocation.Allocation{ diff --git a/code/go/0chain.net/blobbercore/handler/handler_writemarker_test.go b/code/go/0chain.net/blobbercore/handler/handler_writemarker_test.go index 69e7b1544..9e4232aa5 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_writemarker_test.go +++ b/code/go/0chain.net/blobbercore/handler/handler_writemarker_test.go @@ -41,7 +41,7 @@ func TestWriteMarkerHandlers_Lock(t *testing.T) { req.Header.Set("Content-Type", formWriter.FormDataContentType()) rr := httptest.NewRecorder() - handler := http.HandlerFunc(WithHandler(func(ctx *Context) (interface{}, error) { + handler := http.HandlerFunc(WithTxHandler(func(ctx *Context) (interface{}, error) { ctx.AllocationId = "TestHandlers_Lock_allocation_id" return LockWriteMarker(ctx) })) @@ -81,7 +81,7 @@ func TestWriteMarkerHandlers_Unlock(t *testing.T) { req.Header.Set("Content-Type", formWriter.FormDataContentType()) rr := httptest.NewRecorder() - handler := http.HandlerFunc(WithHandler(func(ctx *Context) (interface{}, error) { + handler := http.HandlerFunc(WithTxHandler(func(ctx *Context) (interface{}, error) { ctx.AllocationId = "TestHandlers_Unlock_allocation_id" ctx.Vars["connection"] = "connection_id" return UnlockWriteMarker(ctx) diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 4e58e8526..8904dad8a 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -1417,7 +1417,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob alloc.UsedSize -= latestWriteMarkerEntity.WM.Size alloc.AllocationRoot = allocationRoot alloc.FileMetaRoot = fileMetaRoot - + sendWM := !alloc.IsRedeemRequired if alloc.IsRedeemRequired { writemarkerEntity.Status = writemarker.Rollbacked alloc.IsRedeemRequired = false @@ -1436,7 +1436,7 @@ func (fsh *StorageHandler) Rollback(ctx context.Context, r *http.Request) (*blob if err != nil { return &result, common.NewError("allocation_commit_error", "Error committing the transaction "+err.Error()) } - if alloc.IsRedeemRequired { + if sendWM { err = writemarkerEntity.SendToChan(ctx) if err != nil { return nil, common.NewError("write_marker_error", "Error redeeming the write marker") diff --git a/code/go/0chain.net/blobbercore/handler/protocol.go b/code/go/0chain.net/blobbercore/handler/protocol.go index 1cc362aa2..cbfe2444d 100644 --- a/code/go/0chain.net/blobbercore/handler/protocol.go +++ b/code/go/0chain.net/blobbercore/handler/protocol.go @@ -75,8 +75,11 @@ func getStorageNode() (*transaction.StorageNode, error) { // RegisterBlobber register blobber if it is not registered yet func RegisterBlobber(ctx context.Context) error { + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + _, e := config.ReloadFromChain(ctx, datastore.GetStore().GetDB()) + return e + }) - _, err := config.ReloadFromChain(ctx, datastore.GetStore().GetDB()) if err != nil { // blobber is not registered yet txn, err := sendSmartContractBlobberAdd(ctx) if err != nil { diff --git a/code/go/0chain.net/blobbercore/readmarker/worker.go b/code/go/0chain.net/blobbercore/readmarker/worker.go index 5a580913c..f123e0179 100644 --- a/code/go/0chain.net/blobbercore/readmarker/worker.go +++ b/code/go/0chain.net/blobbercore/readmarker/worker.go @@ -82,9 +82,13 @@ func redeemReadMarkers(ctx context.Context) { } }() - rctx := datastore.GetStore().CreateTransaction(ctx) - db := datastore.GetStore().GetTransaction(rctx) - readMarkers, err := GetRedeemRequiringRMEntities(rctx) + var readMarkers []*ReadMarkerEntity + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + var err error + readMarkers, err = GetRedeemRequiringRMEntities(ctx) + return err + }) + if err != nil { logging.Logger.Error("redeem_readmarker", zap.Any("database_error", err)) return @@ -104,24 +108,16 @@ func redeemReadMarkers(ctx context.Context) { wg.Done() }() - redeemCtx = datastore.GetStore().CreateTransaction(redeemCtx) - defer redeemCtx.Done() - - err := redeemReadMarker(redeemCtx, rmEntity) + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + return redeemReadMarker(ctx, rmEntity) + }) if err != nil { logging.Logger.Error("Error redeeming the read marker.", zap.Error(err)) - datastore.GetStore().GetTransaction(redeemCtx).Rollback() return } - if err := datastore.GetStore().GetTransaction(redeemCtx).Commit().Error; err != nil { - logging.Logger.Error("Error committing the readmarker redeem", zap.Error(err)) - } }(ctx, rmEntity, &wg, guideCh) } wg.Wait() - - db.Rollback() - rctx.Done() } func startRedeemMarkers(ctx context.Context) { diff --git a/code/go/0chain.net/blobbercore/reference/ds_test.go b/code/go/0chain.net/blobbercore/reference/ds_test.go index 06bb793d8..29e90a8fc 100644 --- a/code/go/0chain.net/blobbercore/reference/ds_test.go +++ b/code/go/0chain.net/blobbercore/reference/ds_test.go @@ -1,6 +1,7 @@ package reference import ( + "context" "testing" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" @@ -18,14 +19,14 @@ func TestMockDb(t *testing.T) { config.Configuration.DBPassword = "" require.NoError(t, datastore.GetStore().Open()) - db := datastore.GetStore().GetDB() - if db == nil { - t.Log("err connecting to database") - return - } - ref := &Ref{} - err := db.Where(&Ref{AllocationID: "4f928c7857fabb5737347c42204eea919a4777f893f35724f563b932f64e2367", Path: "/hack.txt"}). - First(ref). - Error - require.NoError(t, err) + _ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + ref := &Ref{} + err := tx.Where(&Ref{AllocationID: "4f928c7857fabb5737347c42204eea919a4777f893f35724f563b932f64e2367", Path: "/hack.txt"}). + First(ref). + Error + require.NoError(t, err) + + return nil + }) } diff --git a/code/go/0chain.net/blobbercore/reference/hashnode.go b/code/go/0chain.net/blobbercore/reference/hashnode.go index 56311768a..456bc943d 100644 --- a/code/go/0chain.net/blobbercore/reference/hashnode.go +++ b/code/go/0chain.net/blobbercore/reference/hashnode.go @@ -11,9 +11,9 @@ import ( // LoadRootHashnode load root node with its descendant nodes func LoadRootHashnode(ctx context.Context, allocationID string) (*Hashnode, error) { - db := datastore.GetStore().GetDB() + tx := datastore.GetStore().GetTransaction(ctx) - db = db.Raw(` + db := tx.Raw(` SELECT allocation_id, type, name, path, validation_root, fixed_merkle_root, actual_file_hash, chunk_size,size,actual_file_size, parent_path FROM reference_objects WHERE allocation_id = ? diff --git a/code/go/0chain.net/blobbercore/reference/hashnode_test.go b/code/go/0chain.net/blobbercore/reference/hashnode_test.go index 19a383228..a885c44a8 100644 --- a/code/go/0chain.net/blobbercore/reference/hashnode_test.go +++ b/code/go/0chain.net/blobbercore/reference/hashnode_test.go @@ -131,8 +131,14 @@ FROM reference_objects`). if it.mock != nil { it.mock() } - - r, err := LoadRootHashnode(context.TODO(), it.allocationID) + var ( + r *Hashnode + err error + ) + err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + r, err = LoadRootHashnode(ctx, it.allocationID) + return err + }) it.assert(test, it.allocationID, r, err) @@ -357,8 +363,14 @@ FROM reference_objects`). it.mock() } - r, err := LoadRootHashnode(context.TODO(), it.allocationID) - + var ( + r *Hashnode + err error + ) + err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + r, err = LoadRootHashnode(ctx, it.allocationID) + return err + }) it.assert(test, it.allocationID, r, err) }, diff --git a/code/go/0chain.net/blobbercore/reference/playlist.go b/code/go/0chain.net/blobbercore/reference/playlist.go index 71ecfa7ee..d7d793f17 100644 --- a/code/go/0chain.net/blobbercore/reference/playlist.go +++ b/code/go/0chain.net/blobbercore/reference/playlist.go @@ -22,26 +22,30 @@ type PlaylistFile struct { // LoadPlaylist load playlist func LoadPlaylist(ctx context.Context, allocationID, path, since string) ([]PlaylistFile, error) { + var files []PlaylistFile - db := datastore.GetStore().GetDB() + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) - sinceId := 0 + sinceId := 0 - if len(since) > 0 { - db.Raw("SELECT id FROM reference_objects WHERE allocation_id = ? and lookup_hash = ? ", allocationID, since).Row().Scan(&sinceId) //nolint: errcheck - } + if len(since) > 0 { + tx.Raw("SELECT id FROM reference_objects WHERE allocation_id = ? and lookup_hash = ? ", allocationID, since).Row().Scan(&sinceId) //nolint: errcheck + } - var files []PlaylistFile + db := tx.Table("reference_objects"). + Select([]string{"lookup_hash", "name", "path", "num_of_blocks", "parent_path", "size", "mimetype", "type"}).Order("id") + if sinceId > 0 { + db.Where("allocation_id = ? and parent_path = ? and type='f' and id > ? and name like '%.ts'", allocationID, path, sinceId) + } else { + db.Where("allocation_id = ? and parent_path = ? and type='f' and name like '%.ts'", allocationID, path) + } - db = db.Table("reference_objects"). - Select([]string{"lookup_hash", "name", "path", "num_of_blocks", "parent_path", "size", "mimetype", "type"}).Order("id") - if sinceId > 0 { - db.Where("allocation_id = ? and parent_path = ? and type='f' and id > ? and name like '%.ts'", allocationID, path, sinceId) - } else { - db.Where("allocation_id = ? and parent_path = ? and type='f' and name like '%.ts'", allocationID, path) - } + return db.Find(&files).Error - if err := db.Find(&files).Error; err != nil { + }) + + if err != nil { return nil, err } @@ -49,21 +53,22 @@ func LoadPlaylist(ctx context.Context, allocationID, path, since string) ([]Play } func LoadPlaylistFile(ctx context.Context, allocationID, lookupHash string) (*PlaylistFile, error) { - - db := datastore.GetStore().GetDB() - file := &PlaylistFile{} - result := db.Table("reference_objects"). - Select([]string{"lookup_hash", "name", "path", "num_of_blocks", "parent_path", "size", "mimetype", "type"}). - Where("allocation_id = ? and lookup_hash = ?", allocationID, lookupHash). - First(file) + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + result := tx.Table("reference_objects"). + Select([]string{"lookup_hash", "name", "path", "num_of_blocks", "parent_path", "size", "mimetype", "type"}). + Where("allocation_id = ? and lookup_hash = ?", allocationID, lookupHash). + First(file) - escapedLookupHash := sanitizeString(lookupHash) - logging.Logger.Info("playlist", zap.String("allocation_id", allocationID), zap.String("lookup_hash", escapedLookupHash)) + escapedLookupHash := sanitizeString(lookupHash) + logging.Logger.Info("playlist", zap.String("allocation_id", allocationID), zap.String("lookup_hash", escapedLookupHash)) + return result.Error + }) - if result.Error != nil { - return nil, result.Error + if err != nil { + return nil, err } return file, nil diff --git a/code/go/0chain.net/blobbercore/reference/referencepath.go b/code/go/0chain.net/blobbercore/reference/referencepath.go index 1f5176a1b..d79499865 100644 --- a/code/go/0chain.net/blobbercore/reference/referencepath.go +++ b/code/go/0chain.net/blobbercore/reference/referencepath.go @@ -9,7 +9,8 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" - "gorm.io/gorm" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "go.uber.org/zap" ) type ReferencePath struct { @@ -241,40 +242,47 @@ func GetRefs(ctx context.Context, allocationID, path, offsetPath, _type string, var pRefs []PaginatedRef path = filepath.Clean(path) - db := datastore.GetStore().GetDB() - db1 := db.Session(&gorm.Session{}) - db2 := db.Session(&gorm.Session{}) - wg := sync.WaitGroup{} wg.Add(2) go func() { - db1 = db1.Model(&Ref{}).Where("allocation_id = ?", allocationID). - Where(db1.Where("path = ?", path).Or("path LIKE ?", path+"%")) - if _type != "" { - db1 = db1.Where("type = ?", _type) - } - if level != 0 { - db1 = db1.Where("level = ?", level) - } + _ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + db1 := tx.Model(&Ref{}).Where("allocation_id = ?", allocationID). + Where("path = ?", path).Or("path LIKE ?", path+"%") + if _type != "" { + db1 = db1.Where("type = ?", _type) + } + if level != 0 { + db1 = db1.Where("level = ?", level) + } + + db1 = db1.Where("path > ?", offsetPath) - db1 = db1.Where("path > ?", offsetPath) + db1 = db1.Order("path") + err = db1.Limit(pageLimit).Find(&pRefs).Error + wg.Done() + + return nil + }) - db1 = db1.Order("path") - err = db1.Limit(pageLimit).Find(&pRefs).Error - wg.Done() }() go func() { - db2 = db2.Model(&Ref{}).Where("allocation_id = ?", allocationID). - Where(db2.Where("path = ?", path).Or("path LIKE ?", path+"%")) - if _type != "" { - db2 = db2.Where("type = ?", _type) - } - if level != 0 { - db2 = db2.Where("level = ?", level) - } - db2.Count(&totalRows) - wg.Done() + _ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + db2 := tx.Model(&Ref{}).Where("allocation_id = ?", allocationID). + Where("path = ?", path).Or("path LIKE ?", path+"%") + if _type != "" { + db2 = db2.Where("type = ?", _type) + } + if level != 0 { + db2 = db2.Where("level = ?", level) + } + db2.Count(&totalRows) + wg.Done() + + return nil + }) }() wg.Wait() if err != nil { @@ -298,49 +306,62 @@ func GetUpdatedRefs(ctx context.Context, allocationID, path, offsetPath, _type, var totalRows int64 var pRefs []PaginatedRef - db := datastore.GetStore().GetDB() - db1 := db.Session(&gorm.Session{}) - db2 := db.Session(&gorm.Session{}) wg := sync.WaitGroup{} wg.Add(2) go func() { - db1 = db1.Model(&Ref{}).Where("allocation_id = ?", allocationID). - Where(db1.Where("path = ?", path).Or("path LIKE ?", path+"%")) - if _type != "" { - db1 = db1.Where("type = ?", _type) - } - if level != 0 { - db1 = db1.Where("level = ?", level) - } - if updatedDate != "" { - db1 = db1.Where("updated_at > ?", updatedDate) - } + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + db1 := tx.Model(&Ref{}).Where("allocation_id = ?", allocationID). + Where("path = ?", path).Or("path LIKE ?", path+"%") + if _type != "" { + db1 = db1.Where("type = ?", _type) + } + if level != 0 { + db1 = db1.Where("level = ?", level) + } + if updatedDate != "" { + db1 = db1.Where("updated_at > ?", updatedDate) + } - if offsetDate != "" { - db1 = db1.Where("(updated_at, path) > (?, ?)", offsetDate, offsetPath) + if offsetDate != "" { + db1 = db1.Where("(updated_at, path) > (?, ?)", offsetDate, offsetPath) + } + db1 = db1.Order("updated_at, path") + db1 = db1.Limit(pageLimit) + err = db1.Find(&pRefs).Error + wg.Done() + + return err + }) + if err != nil { + logging.Logger.Error("error", zap.Error(err)) } - db1 = db1.Order("updated_at, path") - db1 = db1.Limit(pageLimit) - err = db1.Find(&pRefs).Error - wg.Done() }() go func() { - db2 = db2.Model(&Ref{}).Where("allocation_id = ?", allocationID). - Where(db2.Where("path = ?", path).Or("path LIKE ?", path+"%")) - if _type != "" { - db2 = db2.Where("type > ?", level) - } - if level != 0 { - db2 = db2.Where("level = ?", level) - } - if updatedDate != "" { - db2 = db2.Where("updated_at > ?", updatedDate) + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + db2 := tx.Model(&Ref{}).Where("allocation_id = ?", allocationID). + Where("path = ?", path).Or("path LIKE ?", path+"%") + if _type != "" { + db2 = db2.Where("type > ?", level) + } + if level != 0 { + db2 = db2.Where("level = ?", level) + } + if updatedDate != "" { + db2 = db2.Where("updated_at > ?", updatedDate) + } + err = db2.Count(&totalRows).Error + wg.Done() + + return err + }) + if err != nil { + logging.Logger.Error("error", zap.Error(err)) } - db2 = db2.Count(&totalRows) - wg.Done() }() wg.Wait() if err != nil { @@ -386,12 +407,11 @@ func GetRecentlyCreatedRefs( return } -func CountRefs(allocationID string) (int64, error) { +func CountRefs(ctx context.Context, allocationID string) (int64, error) { var totalRows int64 + tx := datastore.GetStore().GetTransaction(ctx) - db := datastore.GetStore().GetDB() - - err := db.Model(&Ref{}). + err := tx.Model(&Ref{}). Where("allocation_id = ?", allocationID). Count(&totalRows).Error diff --git a/code/go/0chain.net/blobbercore/stats/blobberstats.go b/code/go/0chain.net/blobbercore/stats/blobberstats.go index d2030a414..de98c2aaa 100644 --- a/code/go/0chain.net/blobbercore/stats/blobberstats.go +++ b/code/go/0chain.net/blobbercore/stats/blobberstats.go @@ -178,6 +178,7 @@ func (bs *BlobberStats) loadInfraStats(ctx context.Context) { func (bs *BlobberStats) loadDBStats() { bs.DBStats = &DBStats{Status: "✗"} + //todo hide inside wrapper to db db := datastore.GetStore().GetDB() sqldb, err := db.DB() if err != nil { @@ -197,7 +198,7 @@ func (bs *BlobberStats) loadFailedChallengeList(ctx context.Context) { } fcrd := fcrdI.(RequestData) - fcs, count, err := getAllFailedChallenges(fcrd.Offset, fcrd.Limit) + fcs, count, err := getAllFailedChallenges(ctx, fcrd.Offset, fcrd.Limit) if err != nil { Logger.Error("", zap.Any("err", err)) return @@ -496,7 +497,7 @@ type ReadMarkerEntity struct { func loadAllocReadMarkersStat(ctx context.Context, allocationID string) (*ReadMarkersStat, error) { var ( - db = datastore.GetStore().GetDB() + db = datastore.GetStore().GetTransaction(ctx) rme ReadMarkerEntity ) diff --git a/code/go/0chain.net/blobbercore/stats/challengestats.go b/code/go/0chain.net/blobbercore/stats/challengestats.go index 95f6688ac..12e93e77a 100644 --- a/code/go/0chain.net/blobbercore/stats/challengestats.go +++ b/code/go/0chain.net/blobbercore/stats/challengestats.go @@ -1,6 +1,7 @@ package stats import ( + "context" "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" @@ -34,8 +35,8 @@ type ChallengeEntity struct { func (ChallengeEntity) TableName() string { return "challenges" } -func getAllFailedChallenges(offset, limit int) ([]ChallengeEntity, int, error) { - db := datastore.GetStore().GetDB() +func getAllFailedChallenges(ctx context.Context, offset, limit int) ([]ChallengeEntity, int, error) { + db := datastore.GetStore().GetTransaction(ctx) crs := []ChallengeEntity{} err := db.Offset(offset).Limit(limit).Order("challenge_id DESC").Table(ChallengeEntity{}.TableName()).Find(&crs, ChallengeEntity{Result: 2}).Error if err != nil { diff --git a/code/go/0chain.net/blobbercore/stats/handler.go b/code/go/0chain.net/blobbercore/stats/handler.go index 607a236e6..96d8f1302 100644 --- a/code/go/0chain.net/blobbercore/stats/handler.go +++ b/code/go/0chain.net/blobbercore/stats/handler.go @@ -479,9 +479,6 @@ func setStatsRequestDataInContext(r *http.Request, ctx context.Context) context. } func StatsJSONHandler(ctx context.Context, r *http.Request) (interface{}, error) { - ctx = datastore.GetStore().CreateTransaction(ctx) - db := datastore.GetStore().GetTransaction(ctx) - defer db.Rollback() bs := LoadBlobberStats(ctx) return bs, nil } @@ -489,9 +486,6 @@ func StatsJSONHandler(ctx context.Context, r *http.Request) (interface{}, error) func GetStatsHandler(ctx context.Context, r *http.Request) (interface{}, error) { q := r.URL.Query() ctx = context.WithValue(ctx, constants.ContextKeyAllocation, q.Get("allocation_id")) - ctx = datastore.GetStore().CreateTransaction(ctx) - db := datastore.GetStore().GetTransaction(ctx) - defer db.Rollback() allocationID := ctx.Value(constants.ContextKeyAllocation).(string) bs := &BlobberStats{} if allocationID != "" { diff --git a/code/go/0chain.net/blobbercore/writemarker/mutex.go b/code/go/0chain.net/blobbercore/writemarker/mutex.go index f3d56c296..fa8c2e707 100644 --- a/code/go/0chain.net/blobbercore/writemarker/mutex.go +++ b/code/go/0chain.net/blobbercore/writemarker/mutex.go @@ -52,7 +52,7 @@ func (m *Mutex) Lock(ctx context.Context, allocationID, connectionID string) (*L l.Lock() defer l.Unlock() - db := datastore.GetStore().GetDB() + db := datastore.GetStore().GetTransaction(ctx) var lock WriteLock err := db.Table(TableNameWriteLock).Where("allocation_id=?", allocationID).First(&lock).Error @@ -119,7 +119,7 @@ func (*Mutex) Unlock(ctx context.Context, allocationID string, connectionID stri return nil } - db := datastore.GetStore().GetDB() + db := datastore.GetStore().GetTransaction(ctx) err := db.Exec("DELETE FROM write_locks WHERE allocation_id = ? and connection_id = ? ", allocationID, connectionID).Error if err != nil { diff --git a/code/go/0chain.net/blobbercore/writemarker/mutext_test.go b/code/go/0chain.net/blobbercore/writemarker/mutext_test.go index 5055576ad..0c1532f4d 100644 --- a/code/go/0chain.net/blobbercore/writemarker/mutext_test.go +++ b/code/go/0chain.net/blobbercore/writemarker/mutext_test.go @@ -148,7 +148,14 @@ func TestMutext_LockShouldWork(t *testing.T) { if it.mock != nil { it.mock() } - r, err := m.Lock(context.TODO(), it.allocationID, it.connectionID) + var ( + r *LockResult + err error + ) + err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + r, err = m.Lock(ctx, it.allocationID, it.connectionID) + return nil + }) it.assert(test, r, err) diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol.go b/code/go/0chain.net/blobbercore/writemarker/protocol.go index b11c82366..8c6150e0b 100644 --- a/code/go/0chain.net/blobbercore/writemarker/protocol.go +++ b/code/go/0chain.net/blobbercore/writemarker/protocol.go @@ -83,7 +83,7 @@ func (wme *WriteMarkerEntity) VerifyMarker(ctx context.Context, dbAllocation *al } currTime := common.Now() - // blobber clock is allowed to be 10 seconds behing the current time + // blobber clock is allowed to be 10 seconds behind the current time if wme.WM.Timestamp > currTime+10 { return common.NewError("write_marker_validation_failed", "Write Marker timestamp is in the future") } diff --git a/code/go/0chain.net/blobbercore/writemarker/worker.go b/code/go/0chain.net/blobbercore/writemarker/worker.go index ee73ac551..56292c2c0 100644 --- a/code/go/0chain.net/blobbercore/writemarker/worker.go +++ b/code/go/0chain.net/blobbercore/writemarker/worker.go @@ -20,12 +20,10 @@ var ( ) func SetupWorkers(ctx context.Context) { - db := datastore.GetStore().GetDB() var res []allocation.Res - err := db.Transaction(func(tx *gorm.DB) error { - c := datastore.GetStore().WithTransaction(ctx, tx) - res = allocation.Repo.GetAllocationIds(c) + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + res = allocation.Repo.GetAllocationIds(ctx) return nil }) if err != nil && err != gorm.ErrRecordNotFound { @@ -137,11 +135,12 @@ func startRedeem(ctx context.Context) { logging.Logger.Info("Start redeeming writemarkers") writeMarkerChan = make(chan *WriteMarkerEntity, 200) go startRedeemWorker(ctx) - db := datastore.GetStore().GetDB() var writemarkers []*WriteMarkerEntity - - err := db.Not(WriteMarkerEntity{Status: Committed}).Find(&writemarkers).Error + err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + tx := datastore.GetStore().GetTransaction(ctx) + return tx.Not(WriteMarkerEntity{Status: Committed}).Find(&writemarkers).Error + }) if err != nil && err != gorm.ErrRecordNotFound { logging.Logger.Error("Error redeeming the write marker. failed to load allocation's writemarker ", zap.Any("error", err))