Skip to content

Commit

Permalink
Fix/replace transaction (#1192)
Browse files Browse the repository at this point in the history
* add rb logs

* fix rollback wm check

* never replace existent transaction

* never replace existent transaction

* close transaction

* revert

* fix checkout build-&-publish-docker-image.yml

* configuration

* configuration

* cleaned up transaction logic

* cleaned up transaction logic

* cleaned up transaction logic

* cleaned up transaction logic

* fixed tests

* fixed tests

---------

Co-authored-by: Hitenjain14 <[email protected]>
Co-authored-by: shahnawaz-creator <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2023
1 parent b0d46cf commit 3a1a405
Show file tree
Hide file tree
Showing 39 changed files with 534 additions and 576 deletions.
39 changes: 20 additions & 19 deletions code/go/0chain.net/blobber/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,29 +105,30 @@ func setupConfig(configDir string, deploymentMode int) {
func reloadConfig() error {
fmt.Print("> reload config")

db := datastore.GetStore().GetDB()
return datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
s, ok := config.Get(ctx, datastore.GetStore().GetDB())
if ok {
if err := s.CopyTo(&config.Configuration); err != nil {
return err
}
fmt.Print(" [OK]\n")
return nil
}

config.Configuration.Capacity = viper.GetInt64("capacity")

config.Configuration.MinLockDemand = viper.GetFloat64("min_lock_demand")
config.Configuration.NumDelegates = viper.GetInt("num_delegates")
config.Configuration.ReadPrice = viper.GetFloat64("read_price")
config.Configuration.ServiceCharge = viper.GetFloat64("service_charge")
config.Configuration.WritePrice = viper.GetFloat64("write_price")

s, ok := config.Get(context.TODO(), db)
if ok {
if err := s.CopyTo(&config.Configuration); err != nil {
if err := config.Update(ctx, datastore.GetStore().GetDB()); err != nil {
return err
}

fmt.Print(" [OK]\n")
return nil
}

config.Configuration.Capacity = viper.GetInt64("capacity")

config.Configuration.MinLockDemand = viper.GetFloat64("min_lock_demand")
config.Configuration.NumDelegates = viper.GetInt("num_delegates")
config.Configuration.ReadPrice = viper.GetFloat64("read_price")
config.Configuration.ServiceCharge = viper.GetFloat64("service_charge")
config.Configuration.WritePrice = viper.GetFloat64("write_price")

if err := config.Update(context.TODO(), db); err != nil {
return err
}

fmt.Print(" [OK]\n")
return nil
})
}
28 changes: 28 additions & 0 deletions code/go/0chain.net/blobber/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"net/http"
"net/http/pprof"
"runtime"
"strconv"
"sync"
Expand Down Expand Up @@ -53,19 +54,38 @@ func startServer(wg *sync.WaitGroup, r *mux.Router, mode string, port int, isTls
//address := publicIP + ":" + portString
address := ":" + strconv.Itoa(port)
var server *http.Server
var profServer *http.Server

if config.Development() {
// No WriteTimeout setup to enable pprof
server = &http.Server{
Addr: address,
ReadHeaderTimeout: 30 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
Handler: r,
}

pprofMux := http.NewServeMux()
profServer = &http.Server{
Addr: fmt.Sprintf(":%d", port-1000),
ReadTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
Handler: pprofMux,
}
initProfHandlers(pprofMux)
go func() {
err2 := profServer.ListenAndServe()
logging.Logger.Error("Http server shut down", zap.Error(err2))
}()

} else {
server = &http.Server{
Addr: address,
ReadHeaderTimeout: 30 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
Expand All @@ -91,3 +111,11 @@ func initHandlers(r *mux.Router) {
handler.SetupSwagger()
common.SetAdminCredentials()
}

func initProfHandlers(mux *http.ServeMux) {
mux.HandleFunc("/debug/pprof/", handler.RateLimitByGeneralRL(pprof.Index))
mux.HandleFunc("/debug/pprof/cmdline", handler.RateLimitByGeneralRL(pprof.Cmdline))
mux.HandleFunc("/debug/pprof/profile", handler.RateLimitByGeneralRL(pprof.Profile))
mux.HandleFunc("/debug/pprof/symbol", handler.RateLimitByGeneralRL(pprof.Symbol))
mux.HandleFunc("/debug/pprof/trace", handler.RateLimitByGeneralRL(pprof.Trace))
}
7 changes: 4 additions & 3 deletions code/go/0chain.net/blobber/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/handler"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"

"go.uber.org/zap"
Expand All @@ -30,13 +29,15 @@ func setupWorkers(ctx context.Context) {
// startRefreshSettings sync settings from blockchain
func startRefreshSettings(ctx context.Context) {
const REPEAT_DELAY = 60 * 3 // 3 minutes
var err error
for {
select {
case <-ctx.Done():
return
case <-time.After(REPEAT_DELAY * time.Second):
_, err = config.ReloadFromChain(common.GetRootContext(), datastore.GetStore().GetDB())
err := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
_, e := config.ReloadFromChain(ctx, datastore.GetStore().GetDB())
return e
})
if err != nil {
logging.Logger.Warn("failed to refresh blobber settings from chain", zap.Error(err))
continue
Expand Down
203 changes: 98 additions & 105 deletions code/go/0chain.net/blobbercore/allocation/allocationchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,154 +254,147 @@ func (a *AllocationChangeCollector) MoveToFilestore(ctx context.Context) (err er

logging.Logger.Info("Move to filestore", zap.String("allocation_id", a.AllocationID))

tx := datastore.GetStore().GetDB().Begin()

var refs []*Result
limitCh := make(chan struct{}, 10)
wg := &sync.WaitGroup{}

err = tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error {
e := datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
tx := datastore.GetStore().GetTransaction(ctx)
err = tx.Model(&reference.Ref{}).Clauses(clause.Locking{Strength: "NO KEY UPDATE"}).Select("id", "validation_root", "thumbnail_hash", "prev_validation_root", "prev_thumbnail_hash").Where("allocation_id=? AND is_precommit=? AND type=?", a.AllocationID, true, reference.FILE).
FindInBatches(&refs, 50, func(tx *gorm.DB, batch int) error {

for _, ref := range refs {
for _, ref := range refs {

var count int64
if ref.PrevValidationRoot != "" {
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot).
Count(&count)
}
var count int64
if ref.PrevValidationRoot != "" {
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", a.AllocationID, ref.PrevValidationRoot).
Count(&count)
}

limitCh <- struct{}{}
wg.Add(1)
limitCh <- struct{}{}
wg.Add(1)

go func(ref *Result) {
defer func() {
<-limitCh
wg.Done()
}()
go func(ref *Result) {
defer func() {
<-limitCh
wg.Done()
}()

if count == 0 && ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if count == 0 && ref.PrevValidationRoot != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
logging.Logger.Error(fmt.Sprintf("Error while moving file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while moving file: %s", err.Error()),
zap.String("validation_root", ref.ValidationRoot))
}

if ref.ThumbnailHash != "" && ref.ThumbnailHash != ref.PrevThumbnailHash {
if ref.PrevThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash)
if ref.ThumbnailHash != "" && ref.ThumbnailHash != ref.PrevThumbnailHash {
if ref.PrevThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(a.AllocationID, ref.PrevThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail file: %s", err.Error()),
logging.Logger.Error(fmt.Sprintf("Error while moving thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
}
}
err := filestore.GetFileStore().MoveToFilestore(a.AllocationID, ref.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while moving thumbnail file: %s", err.Error()),
zap.String("thumbnail_hash", ref.ThumbnailHash))
}
}

}(ref)
}

return nil
}).Error
}(ref)
}

wg.Wait()
return nil
}).Error

if err != nil {
logging.Logger.Error("Error while moving to filestore", zap.Error(err))
tx.Rollback()
return err
}
wg.Wait()

err = tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error
if err != nil {
logging.Logger.Error("Error while moving to filestore", zap.Error(err))
return err
}

if err != nil {
tx.Rollback()
return err
return tx.Exec("UPDATE reference_objects SET is_precommit=?, prev_validation_root=validation_root, prev_thumbnail_hash=thumbnail_hash WHERE allocation_id=? AND is_precommit=? AND deleted_at is NULL", false, a.AllocationID, true).Error
})
if e != nil {
return e
}
tx.Commit()

return deleteFromFileStore(ctx, a.AllocationID)
}

func deleteFromFileStore(ctx context.Context, allocationID string) error {

db := datastore.GetStore().GetDB().Begin()
limitCh := make(chan struct{}, 10)
wg := &sync.WaitGroup{}
var results []Result

err := db.Model(&reference.Ref{}).Unscoped().Select("id", "validation_root", "thumbnail_hash").
Where("allocation_id=? AND is_precommit=? AND type=? AND deleted_at is not NULL", allocationID, true, reference.FILE).
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
return datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
db := datastore.GetStore().GetTransaction(ctx)

for _, res := range results {
var count int64
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", allocationID, res.ValidationRoot).
Count(&count)
err := db.Model(&reference.Ref{}).Unscoped().Select("id", "validation_root", "thumbnail_hash").
Where("allocation_id=? AND is_precommit=? AND type=? AND deleted_at is not NULL", allocationID, true, reference.FILE).
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {

if count != 0 && res.ThumbnailHash == "" {
continue
}
for _, res := range results {
var count int64
tx.Model(&reference.Ref{}).
Where("allocation_id=? AND validation_root=?", allocationID, res.ValidationRoot).
Count(&count)

if count != 0 && res.ThumbnailHash == "" {
continue
}

limitCh <- struct{}{}
wg.Add(1)
limitCh <- struct{}{}
wg.Add(1)

go func(res Result, count int64) {
defer func() {
<-limitCh
wg.Done()
}()
go func(res Result, count int64) {
defer func() {
<-limitCh
wg.Done()
}()

if count == 0 {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", res.ValidationRoot))
if count == 0 {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ValidationRoot)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting file: %s", err.Error()),
zap.String("validation_root", res.ValidationRoot))
}
}
}

if res.ThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()),
zap.String("thumbnail", res.ThumbnailHash))
if res.ThumbnailHash != "" {
err := filestore.GetFileStore().DeleteFromFilestore(allocationID, res.ThumbnailHash)
if err != nil {
logging.Logger.Error(fmt.Sprintf("Error while deleting thumbnail: %s", err.Error()),
zap.String("thumbnail", res.ThumbnailHash))
}
}
}

}(res, count)
}(res, count)

}
return nil
}).Error
}
return nil
}).Error

wg.Wait()
if err != nil && err != gorm.ErrRecordNotFound {
logging.Logger.Error("DeleteFromFileStore", zap.Error(err))
db.Rollback()
return err
}
wg.Wait()
if err != nil && err != gorm.ErrRecordNotFound {
logging.Logger.Error("DeleteFromFileStore", zap.Error(err))
return err
}

err = db.Model(&reference.Ref{}).Unscoped().
Delete(&reference.Ref{},
"allocation_id = ? AND deleted_at IS NOT NULL",
allocationID).Error
if err != nil {
db.Rollback()
return err
}
db.Commit()
return nil
return db.Model(&reference.Ref{}).Unscoped().
Delete(&reference.Ref{},
"allocation_id = ? AND deleted_at IS NOT NULL",
allocationID).Error
})
}

// Note: We are also fetching refPath for srcPath in copy operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (rf *CopyFileChange) DeleteTempFile() error {
func (rf *CopyFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, fileIDMeta map[string]string) (*reference.Ref, error) {

totalRefs, err := reference.CountRefs(rf.AllocationID)
totalRefs, err := reference.CountRefs(ctx, rf.AllocationID)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 3a1a405

Please sign in to comment.