Skip to content

Commit

Permalink
Merge pull request #2651 from 0chain/add/conductor-tests-for-blobber-…
Browse files Browse the repository at this point in the history
…and-validator

Add/conductor tests for blobber and validator
  • Loading branch information
dabasov authored Sep 11, 2023
2 parents 1a63fd0 + dc67b59 commit 0fb5e3b
Show file tree
Hide file tree
Showing 43 changed files with 1,419 additions and 212 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/build-integration-tests-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ jobs:
make build-mocks
bash ./docker.local/bin/init.setup.sh
- name: Install dependencies
run: |
git clone --recursive https://github.com/herumi/bls
cd bls
sudo make MCL_USE_GMP=0 -C mcl -j $(nproc) lib/libmclbn256.so install
sudo make MCL_USE_GMP=0 MCL_DIR=$(pwd)/mcl -j $(nproc) lib/libbls256.so install
cd ..
- name: Docker Network Setup.
run: |
bash ./docker.local/bin/setup.network.sh || true
Expand All @@ -53,5 +60,7 @@ jobs:
- name: Run Basic conductor tests
run: |
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib
export LD_LIBRARY_PATH
./docker.local/bin/start.conductor.sh basic
14 changes: 14 additions & 0 deletions code/go/0chain.net/chaincore/chain/post_finalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//go:build !integration_tests
// +build !integration_tests

package chain

import (
"context"

"0chain.net/chaincore/block"
)

func (c *Chain) postFinalize(ctx context.Context, fb *block.Block) error {
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//go:build integration_tests
// +build integration_tests

package chain

import (
"context"

"0chain.net/chaincore/block"
"0chain.net/chaincore/transaction"
crpc "0chain.net/conductor/conductrpc"
"github.com/0chain/common/core/logging"
"go.uber.org/zap"
)

type TxnHandler func(txn *transaction.Transaction, client *crpc.Entity) error

var txnHandlers = map[string] TxnHandler{
"generate_challenge": func(txn *transaction.Transaction, client *crpc.Entity) error{
client.ChallengeGenerated(client.State().GenerateChallenge.BlobberID)
return nil
},
"challenge_response": func(txn *transaction.Transaction, client *crpc.Entity) error{
switch txn.TransactionOutput {
case "challenge passed by blobber":
status := 0
if txn.Status == 1 {
status = 1
}
client.SendChallengeStatus(map[string]interface{}{
"blobber_id": txn.ClientID,
"status": status,
})
case "Challenge Failed by Blobber":
client.SendChallengeStatus(map[string]interface{}{
"error": txn.TransactionData,
"status": 0,
"response": txn.TransactionOutput,
"blobber_id": txn.ClientID,
})
}
return nil
},
}

func (c *Chain) postFinalize(ctx context.Context, fb *block.Block) error {
client := crpc.Client()
for _, txn := range fb.Txns {
handler, ok := txnHandlers[txn.FunctionName]
if !ok {
continue
}
logging.Logger.Info("post_finalize processing txn",
zap.Any("function_name", txn.FunctionName),
zap.Any("hash", txn.Hash),
zap.Any("output", txn.TransactionOutput),
)
err := handler(txn, client)
if err != nil {
logging.Logger.Error("post_finalize txn error",
zap.Int64("round", fb.Round),
zap.String("hash", fb.Hash),
zap.Any("txn", txn),
zap.Error(err),
)
}
}

return nil
}
6 changes: 5 additions & 1 deletion code/go/0chain.net/chaincore/chain/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,11 @@ func (c *Chain) finalizeBlockProcess(ctx context.Context, fb *block.Block, bsh B

}
// finalize
return c.finalizeBlock(ctx, fb, bsh)
if err := c.finalizeBlock(ctx, fb, bsh); err != nil {
return err
}

return c.postFinalize(ctx, fb)
}

/*PruneClientStateWorker - a worker that prunes the client state */
Expand Down
125 changes: 118 additions & 7 deletions code/go/0chain.net/conductor/conductor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"0chain.net/conductor/config"
"0chain.net/conductor/config/cases"
"0chain.net/conductor/dirs"
"0chain.net/conductor/utils"
)

//
Expand All @@ -25,7 +26,7 @@ import (
func (r *Runner) setupTimeout(tm time.Duration) {
r.timer = time.NewTimer(tm)
if tm <= 0 {
<-r.timer.C // drain zero timeout
<-r.timer.C // drain zero timeout so that wherever it is waited upon it waits indefinitely
}
}

Expand Down Expand Up @@ -120,6 +121,15 @@ func (r *Runner) SetEnv(env map[string]string) (err error) {
// control nodes
//

func (r *Runner) GetNodes() map[config.NodeName]config.NodeID {
m := make(map[config.NodeName]config.NodeID)
for _, n := range r.conf.Nodes {
m[n.Name] = n.ID
}

return m
}

// Start nodes, or start and lock them.
func (r *Runner) Start(names []NodeName, lock bool,
tm time.Duration) (err error) {
Expand Down Expand Up @@ -350,6 +360,90 @@ func (r *Runner) WaitNoProgress(wait time.Duration) (err error) {
return
}

func (r *Runner) GenerateChallenge(c *config.GenerateChallege) error {
if r.verbose {
log.Print(" [INF] setting generate challenge info")
}

r.chalConf = c
return nil
}

func (r *Runner) WaitForChallengeGeneration(timeout time.Duration) {
if r.verbose {
log.Print(" [INF] waiting for blockchain to generate challenge")
}

r.setupTimeout(timeout)
r.chalConf.WaitOnChallengeGeneration = true
}

func (r *Runner) WaitOnBlobberCommit(timeout time.Duration) {
if r.verbose {
log.Print(" [INF] waiting for blobber to commit writemarker")
}

r.setupTimeout(timeout)
r.chalConf.WaitOnBlobberCommit = true
}

func (r *Runner) WaitForChallengeStatus(timeout time.Duration) {
if r.verbose {
log.Print(" [INF] waiting for challenge status from chain")
}

r.setupTimeout(timeout)
r.chalConf.WaitForChallengeStatus = true
}

func (r *Runner) WaitForFileMetaRoot() {
if r.verbose {
log.Print(" [INF] waiting for file meta root")
}
count := 0
for name := range r.server.Nodes() {
if strings.Contains(string(name), "blobber") {
count++
}
}

f := fileMetaRoot{
shouldWait: true,
totalBlobers: count,
}
r.fileMetaRoot = f
}

func (r *Runner) CheckFileMetaRoot(cfg *config.CheckFileMetaRoot) error {
if r.verbose {
log.Print(" [INF] checking file meta root")
}

var fmrs []string
for _, fmr := range r.fileMetaRoot.fmrs {
fmrs = append(fmrs, fmr)
}

curFmr := fmrs[0]
allEqual := true
for i := 1; i < len(fmrs); i++ {
allEqual = allEqual && curFmr == fmrs[i]
curFmr = fmrs[i]
}

fmt.Printf("RequiredSameRoot = %v, allEqual = %v\n", cfg.RequireSameRoot, allEqual)

if cfg.RequireSameRoot && !allEqual {
return fmt.Errorf("required all file meta root to be same")
}

if !cfg.RequireSameRoot && allEqual {
return fmt.Errorf("required some file meta root to be different")
}

return nil
}

//
// Byzantine blockchain miners.
//
Expand Down Expand Up @@ -699,24 +793,24 @@ func (r *Runner) WaitNoViewChainge(wnvc config.WaitNoViewChainge,
}

// Command executing.
func (r *Runner) Command(name string, tm time.Duration) {
func (r *Runner) Command(name string, params map[string]string, tm time.Duration) {
r.setupTimeout(tm)

if r.verbose {
log.Printf(" [INF] command %q", name)
}

r.waitCommand = r.asyncCommand(name)
r.waitCommand = r.asyncCommand(name, params)
}

func (r *Runner) asyncCommand(name string) (reply chan error) {
func (r *Runner) asyncCommand(name string, params map[string]string) (reply chan error) {
reply = make(chan error)
go r.runAsyncCommand(reply, name)
go r.runAsyncCommand(reply, name, params)
return
}

func (r *Runner) runAsyncCommand(reply chan error, name string) {
var err = r.conf.Execute(name)
func (r *Runner) runAsyncCommand(reply chan error, name string, params map[string]string) {
var err = r.conf.Execute(name, params)
if err != nil {
err = fmt.Errorf("%q: %v", name, err)
}
Expand Down Expand Up @@ -936,6 +1030,23 @@ func (r *Runner) SetServerState(update interface{}) error {
state.CollectVerificationTicketsWhenMissedVRF = update
case *config.AdversarialAuthorizer:
state.AdversarialAuthorizer = update
case config.StopChallengeGeneration:
state.StopChallengeGeneration = bool(update)
case config.StopWMCommit:
state.StopWMCommit = true
case config.BlobberCommittedWM:
state.BlobberCommittedWM = true
case *config.GenerateChallege:
state.GenerateChallenge = update
case config.GetFileMetaRoot:
state.GetFileMetaRoot = bool(update)
case *config.RenameCommitControl:
if update.Fail {
state.FailRenameCommit = utils.SliceUnion(state.FailRenameCommit, update.Nodes)
} else {
state.FailRenameCommit = utils.SliceDifference(state.FailRenameCommit, update.Nodes)
}
fmt.Printf("state.FailRenameCommit = %v\n", state.FailRenameCommit)
}
})

Expand Down
Loading

0 comments on commit 0fb5e3b

Please sign in to comment.