Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Case 3 and case 4 #1198

Merged
merged 22 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (rf *RenameFileChange) DeleteTempFile() error {
return nil
}

func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
func (rf *RenameFileChange) applyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're trying not to re-write the code for conductor tests, that's why we lower-cased the fn. that applies the main logic and kept the upper-case named fn. to be a switch between main flow and conductor testing flow. Is this contradicting with any other code?

allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) {

if rf.Path == "/" {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//go:build integration_tests
// +build integration_tests

package allocation

import (
"context"
"errors"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
)

func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) {

state := conductrpc.Client().State()
if state.FailRenameCommit != nil {
for _, nodeId := range state.FailRenameCommit {
if nodeId == node.Self.ID {
return nil, errors.New("error directed by conductor")
}
}
}
return rf.applyChange(ctx, rootRef, change, allocationRoot, ts, nil)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//go:build !integration_tests
// +build !integration_tests

package allocation

import (
"context"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
)

func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) {

return rf.applyChange(ctx, rootRef, change, allocationRoot, ts, nil)
}
5 changes: 5 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,8 @@ func (c *client) blobberCommitted(blobberID string) (err error) {
err = c.client.Call("Server.BlobberCommitted", blobberID, nil)
return
}

func (c *client) sendFileMetaRoot(m map[string]string) (err error) {
err = c.client.Call("Server.GetFileMetaRoot", m, nil)
return
}
18 changes: 18 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/entity.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package conductrpc

import (
"context"
"log"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -31,6 +32,10 @@ func (e *Entity) State() (state *State) {
// SetState sets current state.
func (e *Entity) SetState(state *State) {
e.state.Store(state) // update

if state.GetFileMetaRoot {
go SendFileMetaRoot()
}
}

// NewEntity creates RPC client for integration tests.
Expand Down Expand Up @@ -107,6 +112,19 @@ func (e *Entity) BlobberCommitted(blobberID string) {
}
}


func (e *Entity) SendFileMetaRoot(blobberID, fileMetaRoot string, ctxCncl context.CancelFunc) {
m := map[string]string{
"blobber_id": blobberID,
"file_meta_root": fileMetaRoot,
}
err := e.client.sendFileMetaRoot(m)
if err != nil {
return
}
ctxCncl()
}

//
// global
//
Expand Down
64 changes: 64 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/file_meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package conductrpc

import (
"context"

"log"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"

"github.com/0chain/blobber/code/go/0chain.net/core/node"
)

// alreadyRunning is simple indicator that given function is running
// no need to acquire mutex lock. It does not matter if at a time it
// somehow runs the given function multiple times. Since it takes some
// time to acquire state from rpc server there is no concurrent running
var alreadyRunning bool

func SendFileMetaRoot() {
if alreadyRunning {
return
}
alreadyRunning = true
defer func() {
alreadyRunning = false
}()

ctx, ctxCncl := context.WithCancel(context.TODO())
defer ctxCncl()

for {
select {
case <-ctx.Done():
return
default:
}

s := global.State()
if s.GetFileMetaRoot {
fmr, err := getFileMetaRoot()
if err != nil {
log.Printf("Error: %v", err)
continue
}

global.SendFileMetaRoot(node.Self.ID, fmr, ctxCncl)
}
}
}

func getFileMetaRoot() (string, error) {
db := datastore.GetStore().GetDB()
var fmr string

// It will work fine because this blobber will have only single allocation
// created by conductor
err := db.Raw("SELECT file_meta_root FROM allocations LIMIT 1").Scan(&fmr).Error

if err != nil {
return "", err
}

return fmr, nil
}
2 changes: 2 additions & 0 deletions code/go/0chain.net/conductor/conductrpc/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type State struct {
BlobberDelete BlobberDelete
AdversarialValidator AdversarialValidator
StopWMCommit *bool
FailRenameCommit []string
GetFileMetaRoot bool
}

// Name returns NodeName by given NodeID.
Expand Down
Loading