Skip to content

Commit

Permalink
Fix cleanup worker (#1262)
Browse files Browse the repository at this point in the history
* add logs to check alloc id

* fix empty allocation id
  • Loading branch information
Hitenjain14 authored Sep 22, 2023
1 parent 46015ce commit 63e1187
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ func (cc *AllocationChangeCollector) ComputeProperties() {
if acp == nil {
continue // unknown operation (impossible case?)
}

if err := acp.Unmarshal(change.Input); err != nil { // error is not handled
logging.Logger.Error("AllocationChangeCollector_unmarshal", zap.Error(err))
}
Expand Down
28 changes: 5 additions & 23 deletions code/go/0chain.net/blobbercore/challenge/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,25 +107,19 @@ func syncOpenChallenges(ctx context.Context) {

}

func validateOnValidators(c *ChallengeEntity) {
func validateOnValidators(ctx context.Context, c *ChallengeEntity) error {

logging.Logger.Info("[challenge]validate: ",
zap.Any("challenge", c),
zap.String("challenge_id", c.ChallengeID),
)

ctx := datastore.GetStore().CreateTransaction(context.TODO())
defer ctx.Done()

tx := datastore.GetStore().GetTransaction(ctx)

if err := CreateChallengeTiming(c.ChallengeID, c.CreatedAt); err != nil {
logging.Logger.Error("[challengetiming]add: ",
zap.String("challenge_id", c.ChallengeID),
zap.Error(err))
deleteChallenge(c.RoundCreatedAt)
tx.Rollback()
return
return err
}

createdTime := common.ToTime(c.CreatedAt)
Expand All @@ -143,10 +137,8 @@ func validateOnValidators(c *ChallengeEntity) {
zap.String("validationTickets", string(c.ValidationTicketsString)),
zap.String("ObjectPath", string(c.ObjectPathString)),
zap.Error(err))
tx.Rollback()

c.CancelChallenge(ctx, err)
return
return nil
}

if err := c.LoadValidationTickets(ctx); err != nil {
Expand All @@ -156,17 +148,7 @@ func validateOnValidators(c *ChallengeEntity) {
zap.Error(err))
//TODO: Should we delete the challenge from map or send it back to the todo channel?
deleteChallenge(c.RoundCreatedAt)
tx.Rollback()
return
}

if err := tx.Commit().Error; err != nil {
logging.Logger.Error("[challenge]validate(Commit): ",
zap.Any("challenge_id", c.ChallengeID),
zap.Time("created", createdTime),
zap.Error(err))
tx.Rollback()
return
return err
}

completedValidation := time.Now()
Expand All @@ -181,7 +163,7 @@ func validateOnValidators(c *ChallengeEntity) {
logging.Logger.Info("[challenge]validate: ",
zap.Any("challenge_id", c.ChallengeID),
zap.Time("created", createdTime))

return nil
}

func (c *ChallengeEntity) getCommitTransaction() (*transaction.Transaction, error) {
Expand Down
4 changes: 3 additions & 1 deletion code/go/0chain.net/blobbercore/challenge/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ func processChallenge(ctx context.Context, it *ChallengeEntity) {
logging.Logger.Info("processing_challenge",
zap.String("challenge_id", it.ChallengeID))

validateOnValidators(it)
_ = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
return validateOnValidators(ctx, it)
})
}

func commitOnChainWorker(ctx context.Context) {
Expand Down
4 changes: 4 additions & 0 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ func (fs *FileStore) DeleteFile(allocID, validationRoot string) error {
}

func (fs *FileStore) DeleteTempFile(allocID, conID string, fd *FileInputData) error {
if allocID == "" {
logging.Logger.Error("invalid_allocation_id", zap.String("connection_id", conID), zap.Any("file_data", fd))
return common.NewError("invalid_allocation_id", "Allocation id cannot be empty")
}
fileObjectPath := fs.getTempPathForFile(allocID, fd.Name, encryption.Hash(fd.Path), conID)

finfo, err := os.Stat(fileObjectPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (cmd *UpdateFileCommand) reloadChange(connectionObj *allocation.AllocationC

// UpdateChange add UpdateFileChanger in db
func (cmd *UpdateFileCommand) UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error {
cmd.fileChanger.AllocationID = connectionObj.AllocationID
for _, c := range connectionObj.Changes {
filePath, _ := c.GetOrParseAffectedFilePath()
if c.Operation != sdkConst.FileOperationUpdate || cmd.fileChanger.Path != filePath {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (cmd *UploadFileCommand) reloadChange(connectionObj *allocation.AllocationC

// UpdateChange replace AddFileChange in db
func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error {
cmd.fileChanger.AllocationID = connectionObj.AllocationID
for _, c := range connectionObj.Changes {
filePath, _ := c.GetOrParseAffectedFilePath()
if c.Operation != constants.FileOperationInsert || cmd.fileChanger.Path != filePath {
Expand Down
9 changes: 5 additions & 4 deletions code/go/0chain.net/blobbercore/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func setupHandlers(r *mux.Router) {
// r.HandleFunc("/_statsJSON", common.AuthenticateAdmin(common.ToJSONResponse(stats.StatsJSONHandler)))
r.HandleFunc("/_statsJSON", RateLimitByCommmitRL(common.ToJSONResponse(stats.StatsJSONHandler)))
// r.HandleFunc("/_cleanupdisk", common.AuthenticateAdmin(common.ToJSONResponse(WithReadOnlyConnection(CleanupDiskHandler))))
r.HandleFunc("/_cleanupdisk", RateLimitByCommmitRL(common.ToJSONResponse(WithReadOnlyConnection(CleanupDiskHandler))))
// r.HandleFunc("/_cleanupdisk", RateLimitByCommmitRL(common.ToJSONResponse(WithReadOnlyConnection(CleanupDiskHandler))))
// r.HandleFunc("/getstats", common.AuthenticateAdmin(common.ToJSONResponse(stats.GetStatsHandler)))
r.HandleFunc("/getstats", RateLimitByCommmitRL(common.ToJSONResponse(WithReadOnlyConnection(stats.GetStatsHandler))))
// r.HandleFunc("/challengetimings", common.AuthenticateAdmin(common.ToJSONResponse(GetChallengeTimings)))
Expand Down Expand Up @@ -271,11 +271,12 @@ func setupHandlers(r *mux.Router) {
func WithReadOnlyConnection(handler common.JSONResponderF) common.JSONResponderF {
return func(ctx context.Context, r *http.Request) (interface{}, error) {
ctx = GetMetaDataStore().CreateTransaction(ctx)

res, err := handler(ctx, r)
tx := GetMetaDataStore().GetTransaction(ctx)
defer func() {
GetMetaDataStore().GetTransaction(ctx).Rollback()
tx.Rollback()
}()

res, err := handler(ctx, r)
return res, err
}
}
Expand Down

0 comments on commit 63e1187

Please sign in to comment.