diff --git a/cmd/start.go b/cmd/start.go index f9af98c971..0760858030 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -111,20 +111,23 @@ func Start(c *cli.Context) error { return err } } - hasMinimumStake, err := stakeMonitor.HasMinimumStake( - operatorPublicKey, - ) - if err != nil { - return fmt.Errorf("could not check the stake [%v]", err) - } - if !hasMinimumStake { - return fmt.Errorf( - "no minimum KEEP stake or operator is not authorized to use it; " + - "please make sure the operator address in the configuration " + - "is correct and it has KEEP tokens delegated and the operator " + - "contract has been authorized to operate on the stake", - ) - } + + // TODO: Disable the minimum stake check to be able to start the client + // without v1 contracts deployed. + // hasMinimumStake, err := stakeMonitor.HasMinimumStake( + // operatorPublicKey, + // ) + // if err != nil { + // return fmt.Errorf("could not check the stake [%v]", err) + // } + // if !hasMinimumStake { + // return fmt.Errorf( + // "no minimum KEEP stake or operator is not authorized to use it; " + + // "please make sure the operator address in the configuration " + + // "is correct and it has KEEP tokens delegated and the operator " + + // "contract has been authorized to operate on the stake", + // ) + // } netProvider, err := libp2p.Connect( ctx, diff --git a/pkg/beacon/beacon.go b/pkg/beacon/beacon.go index a7f88da2cd..d4491dad97 100644 --- a/pkg/beacon/beacon.go +++ b/pkg/beacon/beacon.go @@ -3,18 +3,16 @@ package beacon import ( "context" "encoding/hex" - "github.com/keep-network/keep-core/pkg/operator" "time" + "github.com/keep-network/keep-core/pkg/operator" + "github.com/ipfs/go-log" "github.com/keep-network/keep-common/pkg/persistence" "github.com/keep-network/keep-core/pkg/beacon/relay" relaychain "github.com/keep-network/keep-core/pkg/beacon/relay/chain" - dkgresult "github.com/keep-network/keep-core/pkg/beacon/relay/dkg/result" "github.com/keep-network/keep-core/pkg/beacon/relay/event" - "github.com/keep-network/keep-core/pkg/beacon/relay/gjkr" - "github.com/keep-network/keep-core/pkg/beacon/relay/groupselection" "github.com/keep-network/keep-core/pkg/beacon/relay/registry" "github.com/keep-network/keep-core/pkg/chain" "github.com/keep-network/keep-core/pkg/net" @@ -64,19 +62,7 @@ func Initialize( groupRegistry, ) - // We need to calculate group selection duration here as we can't do it - // inside the deduplicator due to import cycles. We don't include the - // time needed for publication as we are interested about the minimum - // possible off-chain group create protocol duration. - minGroupCreationDurationBlocks := - chainConfig.TicketSubmissionTimeout + - gjkr.ProtocolBlocks() + - dkgresult.PrePublicationBlocks() - - eventDeduplicator := event.NewDeduplicator( - relayChain, - minGroupCreationDurationBlocks, - ) + eventDeduplicator := event.NewDeduplicator(relayChain) node.ResumeSigningIfEligible(relayChain, signing) @@ -150,54 +136,32 @@ func Initialize( ) }) - _ = relayChain.OnGroupSelectionStarted(func(event *event.GroupSelectionStart) { - onGroupSelected := func(group *groupselection.Result) { - for index, staker := range group.SelectedStakers { - logger.Infof( - "new candidate group member [0x%v] with index [%v]", - hex.EncodeToString(staker), - index, - ) - } - node.JoinGroupIfEligible( - relayChain, - signing, - group, - event.NewEntry, - ) - } - + _ = relayChain.OnDKGStarted(func(event *event.DKGStarted) { go func() { - if ok := eventDeduplicator.NotifyGroupSelectionStarted( - event.BlockNumber, + if ok := eventDeduplicator.NotifyDKGStarted( + event.Seed, ); !ok { logger.Warningf( - "group selection event with seed [0x%x] and "+ + "DKG started event with seed [0x%x] and "+ "starting block [%v] has been already processed", - event.NewEntry, + event.Seed, event.BlockNumber, ) return } logger.Infof( - "group selection started with seed [0x%x] at block [%v]", - event.NewEntry, + "DKG started with seed [0x%x] at block [%v]", + event.Seed, event.BlockNumber, ) - err = groupselection.CandidateToNewGroup( + node.JoinDKGIfEligible( relayChain, - blockCounter, - chainConfig, - staker, - event.NewEntry, + signing, + event.Seed, event.BlockNumber, - onGroupSelected, ) - if err != nil { - logger.Errorf("tickets submission failed: [%v]", err) - } }() }) diff --git a/pkg/beacon/relay/chain/chain.go b/pkg/beacon/relay/chain/chain.go index 34788d9ae0..92eefef90d 100644 --- a/pkg/beacon/relay/chain/chain.go +++ b/pkg/beacon/relay/chain/chain.go @@ -10,6 +10,9 @@ import ( ) // StakerAddress represents chain-specific address of the staker. +// DEPRECATED +// TODO: The "staker" should probably become "operator" to reflect random +// beacon v2 structure. type StakerAddress []byte // GroupMemberIndex is an index of a threshold relay group member. @@ -52,21 +55,10 @@ type RelayEntryInterface interface { // GroupSelectionInterface defines the subset of the relay chain interface that // pertains to relay group selection activities. type GroupSelectionInterface interface { - // OnGroupSelectionStarted is a callback that is invoked when an on-chain - // group selection started and the contract is ready to accept tickets. - OnGroupSelectionStarted( - func(groupSelectionStarted *event.GroupSelectionStart), - ) subscription.EventSubscription - // SubmitTicket submits a ticket corresponding to the virtual staker to - // the chain, and returns a promise to track the submission. The promise - // is fulfilled with the entry as seen on-chain, or failed if there is an - // error submitting the entry. - SubmitTicket(ticket *Ticket) *async.EventGroupTicketSubmissionPromise - // GetSubmittedTickets gets the submitted group candidate tickets so far. - GetSubmittedTickets() ([]uint64, error) - // GetSelectedParticipants returns `GroupSize` slice of addresses of - // candidates which have been selected to the currently assembling group. - GetSelectedParticipants() ([]StakerAddress, error) + // SelectGroup returns the group members for the group generated by + // the given seed. This function can return an error if the relay chain's + // state does not allow for group selection at the moment. + SelectGroup(seed *big.Int) ([]StakerAddress, error) } // GroupRegistrationInterface defines the subset of the relay chain interface @@ -77,8 +69,11 @@ type GroupRegistrationInterface interface { OnGroupRegistered( func(groupRegistration *event.GroupRegistration), ) subscription.EventSubscription - // Checks if a group with the given public key is considered as - // stale on-chain. Group is considered as stale if it is expired and when + // IsGroupRegistered checks if group with the given public key is registered + // on-chain. + IsGroupRegistered(groupPublicKey []byte) (bool, error) + // IsStaleGroup checks if a group with the given public key is considered + // as stale on-chain. Group is considered as stale if it is expired and when // its expiration time and potentially executed operation timeout are both // in the past. Stale group is never selected by the chain to any new // operation. @@ -99,6 +94,11 @@ type GroupInterface interface { // interface that pertains specifically to group formation's distributed key // generation process. type DistributedKeyGenerationInterface interface { + // OnDKGStarted registers a callback that is invoked when an on-chain + // notification of the DKG process start is seen. + OnDKGStarted( + func(event *event.DKGStarted), + ) subscription.EventSubscription // SubmitDKGResult sends DKG result to a chain, along with signatures over // result hash from group participants supporting the result. // Signatures over DKG result hash are collected in a map keyed by signer's @@ -113,9 +113,6 @@ type DistributedKeyGenerationInterface interface { OnDKGResultSubmitted( func(event *event.DKGResultSubmission), ) subscription.EventSubscription - // IsGroupRegistered checks if group with the given public key is registered - // on-chain. - IsGroupRegistered(groupPublicKey []byte) (bool, error) // CalculateDKGResultHash calculates 256-bit hash of DKG result in standard // specific for the chain. Operation is performed off-chain. CalculateDKGResultHash(dkgResult *DKGResult) (DKGResultHash, error) @@ -147,9 +144,6 @@ type Config struct { // HonestThreshold is the minimum number of active participants behaving // according to the protocol needed to generate a new relay entry. HonestThreshold int - // TicketSubmissionTimeout is the duration (in blocks) the staker has to - // submit any tickets to candidate to a new group. - TicketSubmissionTimeout uint64 // ResultPublicationBlockStep is the duration (in blocks) that has to pass // before group member with the given index is eligible to submit the // result. diff --git a/pkg/beacon/relay/chain/group_ticket.go b/pkg/beacon/relay/chain/group_ticket.go deleted file mode 100644 index 4ec2cb6bc9..0000000000 --- a/pkg/beacon/relay/chain/group_ticket.go +++ /dev/null @@ -1,17 +0,0 @@ -package chain - -import ( - "math/big" -) - -// Ticket represents group selection ticket as seen on-chain. -type Ticket struct { - Value [8]byte // W_k - Proof *TicketProof -} - -// TicketProof represents group selection ticket proof as seen on-chain. -type TicketProof struct { - StakerValue *big.Int - VirtualStakerIndex *big.Int -} diff --git a/pkg/beacon/relay/event/deduplicator.go b/pkg/beacon/relay/event/deduplicator.go index a6ad4208df..f22fbcdcbf 100644 --- a/pkg/beacon/relay/event/deduplicator.go +++ b/pkg/beacon/relay/event/deduplicator.go @@ -3,8 +3,16 @@ package event import ( "encoding/hex" "fmt" + "github.com/keep-network/keep-common/pkg/cache" "math/big" "sync" + "time" +) + +const ( + // DKGSeedCachePeriod is the time period the cache maintains + // the DKG seed corresponding to a DKG instance. + DKGSeedCachePeriod = 7 * 24 * time.Hour ) // Local chain interface to avoid import cycles. @@ -24,14 +32,12 @@ type chain interface { // should be handled. // // Those events are supported: -// - group selection started +// - DKG started // - relay entry requested type Deduplicator struct { - chain chain - minGroupCreationDurationBlocks uint64 + chain chain - groupSelectionMutex sync.Mutex - currentGroupSelectionStartBlock uint64 + dkgSeedCache *cache.TimeCache relayEntryMutex sync.Mutex currentRequestStartBlock uint64 @@ -39,36 +45,32 @@ type Deduplicator struct { } // NewDeduplicator constructs a new Deduplicator instance. -func NewDeduplicator( - chain chain, - minGroupCreationDurationBlocks uint64, -) *Deduplicator { +func NewDeduplicator(chain chain) *Deduplicator { return &Deduplicator{ - chain: chain, - minGroupCreationDurationBlocks: minGroupCreationDurationBlocks, + chain: chain, + dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod), } } -// NotifyGroupSelectionStarted notifies the client wants to start group -// selection upon receiving an event. It returns boolean indicating whether the +// NotifyDKGStarted notifies the client wants to start the distributed key +// generation upon receiving an event. It returns boolean indicating whether the // client should proceed with the execution or ignore the event as a duplicate. -func (d *Deduplicator) NotifyGroupSelectionStarted( - newGroupSelectionStartBlock uint64, +func (d *Deduplicator) NotifyDKGStarted( + newDKGSeed *big.Int, ) bool { - d.groupSelectionMutex.Lock() - defer d.groupSelectionMutex.Unlock() - - minCurrentGroupCreationEndBlock := d.currentGroupSelectionStartBlock + - d.minGroupCreationDurationBlocks - - shouldUpdate := d.currentGroupSelectionStartBlock == 0 || - newGroupSelectionStartBlock > minCurrentGroupCreationEndBlock - - if shouldUpdate { - d.currentGroupSelectionStartBlock = newGroupSelectionStartBlock + d.dkgSeedCache.Sweep() + + // The cache key is the hexadecimal representation of the seed. + cacheKey := newDKGSeed.Text(16) + // If the key is not in the cache, that means the seed was not handled + // yet and the client should proceed with the execution. + if !d.dkgSeedCache.Has(cacheKey) { + d.dkgSeedCache.Add(cacheKey) return true } + // Otherwise, the DKG seed is a duplicate and the client should not proceed + // with the execution. return false } @@ -122,7 +124,7 @@ func (d *Deduplicator) NotifyRelayEntryStarted( if newRequestPreviousEntry == hex.EncodeToString(currentRequestPreviousEntryOnChain[:]) && newRequestStartBlock == - currentRequestStartBlockOnChain.Uint64() { + currentRequestStartBlockOnChain.Uint64() { return true, nil } } else { diff --git a/pkg/beacon/relay/event/deduplicator_test.go b/pkg/beacon/relay/event/deduplicator_test.go index 5d2d4baf14..ad36ce9e5c 100644 --- a/pkg/beacon/relay/event/deduplicator_test.go +++ b/pkg/beacon/relay/event/deduplicator_test.go @@ -2,71 +2,53 @@ package event import ( "encoding/hex" + "github.com/keep-network/keep-common/pkg/cache" "math/big" "testing" + "time" ) -func TestStartGroupSelection_NoPriorGroupSelections(t *testing.T) { - chain := &testChain{ - currentRequestStartBlockValue: nil, - currentRequestPreviousEntryValue: []byte{}, - } - - // In case of first group selection for that node, the last group selection - // block number held by the deduplicator is zero. - deduplicator := NewDeduplicator( - chain, - 200, - ) +const testDKGSeedCachePeriod = 1 * time.Second - canGenerate := deduplicator.NotifyGroupSelectionStarted(5) - - if !canGenerate { - t.Fatal("should be allowed to start group selection") - } -} - -func TestStartGroupSelection_MinGroupSelectionDurationPassed(t *testing.T) { +func TestNotifyDKGStarted(t *testing.T) { chain := &testChain{ currentRequestStartBlockValue: nil, currentRequestPreviousEntryValue: []byte{}, } - deduplicator := NewDeduplicator( - chain, - 200, - ) - - // Simulate the last group selection occured at block 100 - deduplicator.NotifyGroupSelectionStarted(100) + deduplicator := &Deduplicator{ + chain: chain, + dkgSeedCache: cache.NewTimeCache(testDKGSeedCachePeriod), + } - // Group selection will be possible at block 100 + 200 + 1 = 301 - canGenerate := deduplicator.NotifyGroupSelectionStarted(301) + seed1 := big.NewInt(100) + seed2 := big.NewInt(200) - if !canGenerate { - t.Fatal("should be allowed to start group selection") + // Add the first seed. + canJoinDKG := deduplicator.NotifyDKGStarted(seed1) + if !canJoinDKG { + t.Fatal("should be allowed to join DKG") } -} -func TestStartGroupSelection_MinGroupSelectionDurationNotPassed(t *testing.T) { - chain := &testChain{ - currentRequestStartBlockValue: nil, - currentRequestPreviousEntryValue: []byte{}, + // Add the second seed. + canJoinDKG = deduplicator.NotifyDKGStarted(seed2) + if !canJoinDKG { + t.Fatal("should be allowed to join DKG") } - deduplicator := NewDeduplicator( - chain, - 200, - ) - - // Simulate the last group selection occured at block 100 - deduplicator.NotifyGroupSelectionStarted(100) + // Add the first seed before caching period elapses. + canJoinDKG = deduplicator.NotifyDKGStarted(seed1) + if canJoinDKG { + t.Fatal("should not be allowed to join DKG") + } - // Group selection will be possible at block 100 + 200 + 1 = 301 - canGenerate := deduplicator.NotifyGroupSelectionStarted(300) + // Wait until caching period elapses. + time.Sleep(testDKGSeedCachePeriod) - if canGenerate { - t.Fatal("should not be allowed to start group selection") + // Add the first seed again. + canJoinDKG = deduplicator.NotifyDKGStarted(seed1) + if !canJoinDKG { + t.Fatal("should be allowed to join DKG") } } @@ -78,7 +60,6 @@ func TestStartRelayEntry_NoPriorRelayEntries(t *testing.T) { deduplicator := NewDeduplicator( chain, - 200, ) canGenerate, err := deduplicator.NotifyRelayEntryStarted( @@ -102,7 +83,6 @@ func TestStartRelayEntry_LowerStartBlock(t *testing.T) { deduplicator := NewDeduplicator( chain, - 200, ) _, err := deduplicator.NotifyRelayEntryStarted(100, "01") @@ -131,7 +111,6 @@ func TestStartRelayEntry_HigherStartBlock_DifferentPreviousEntry(t *testing.T) { deduplicator := NewDeduplicator( chain, - 200, ) _, err := deduplicator.NotifyRelayEntryStarted(100, "01") @@ -165,7 +144,6 @@ func TestStartRelayEntry_HigherStartBlock_SamePreviousEntry_ConfirmedOnChain(t * deduplicator := NewDeduplicator( chain, - 200, ) _, err = deduplicator.NotifyRelayEntryStarted(100, "01") @@ -199,7 +177,6 @@ func TestStartRelayEntry_HigherStartBlock_SamePreviousEntry_PreviousEntryNotConf deduplicator := NewDeduplicator( chain, - 200, ) _, err = deduplicator.NotifyRelayEntryStarted(100, "01") @@ -233,7 +210,6 @@ func TestStartRelayEntry_HigherStartBlock_SamePreviousEntry_StartBlockNotConfirm deduplicator := NewDeduplicator( chain, - 200, ) _, err = deduplicator.NotifyRelayEntryStarted(100, "01") diff --git a/pkg/beacon/relay/event/event.go b/pkg/beacon/relay/event/event.go index b1e5d30029..412f89606a 100644 --- a/pkg/beacon/relay/event/event.go +++ b/pkg/beacon/relay/event/event.go @@ -28,16 +28,9 @@ type Request struct { BlockNumber uint64 } -// GroupSelectionStart represents a group selection start event. -type GroupSelectionStart struct { - NewEntry *big.Int - BlockNumber uint64 -} - -// GroupTicketSubmission represents a group selection ticket submission event. -type GroupTicketSubmission struct { - TicketValue *big.Int - +// DKGStarted represents a DKG start event. +type DKGStarted struct { + Seed *big.Int BlockNumber uint64 } diff --git a/pkg/beacon/relay/groupselection/groupselection.go b/pkg/beacon/relay/groupselection/groupselection.go deleted file mode 100644 index d49c6fd2d5..0000000000 --- a/pkg/beacon/relay/groupselection/groupselection.go +++ /dev/null @@ -1,293 +0,0 @@ -// Package groupselection implements the random beacon group selection protocol -// - an interactive, ticket-based method of selecting a candidate group from -// the set of all stakers given a pseudorandom seed value. -package groupselection - -import ( - "fmt" - "math/big" - "sort" - - "github.com/ipfs/go-log" - - relaychain "github.com/keep-network/keep-core/pkg/beacon/relay/chain" - "github.com/keep-network/keep-core/pkg/chain" -) - -var logger = log.Logger("keep-groupselection") - -// Recommended parameters all clients should use to minimize their expenses. -// It is not a must to obey but it is nice and polite. And being nice to others -// helps in reducing own costs because other clients should respect the same -// protocol. -const ( - // The number of blocks one round takes. - roundDuration = uint64(6) - - // The delay in blocks after all rounds complete to ensure all transactions - // are mined - miningLag = uint64(12) -) - -// Result represents the result of group selection protocol. It contains the -// list of all stakers selected to the candidate group as well as the number of -// block at which the group selection protocol completed. -type Result struct { - SelectedStakers []relaychain.StakerAddress - GroupSelectionEndBlock uint64 -} - -// CandidateToNewGroup attempts to generate and submit tickets for the -// staker to join a new group. -// -// To minimize the submitter's cost by minimizing the number of redundant -// tickets that are not selected into the group, tickets are submitted in -// N rounds, each round taking 6 blocks. -// As the basic principle, the number of leading zeros in the ticket -// value is subtracted from the number of rounds to determine the round -// the ticket should be submitted in: -// - in round 0, tickets with N or more leading zeros are submitted -// - in round 1, tickets with N-1 or more leading zeros are submitted -// (...) -// - in round N, tickets with no leading zeros are submitted. -// -// In each round, group member candidate needs to monitor tickets -// submitted by other candidates and compare them against tickets of -// the candidate not yet submitted to determine if continuing with -// ticket submission still makes sense. -// -// After the last round, there is a 12 blocks mining lag allowing all -// outstanding ticket submissions to have a higher chance of being -// mined before the deadline. -func CandidateToNewGroup( - relayChain relaychain.Interface, - blockCounter chain.BlockCounter, - chainConfig *relaychain.Config, - staker chain.Staker, - newEntry *big.Int, - startBlockHeight uint64, - onGroupSelected func(*Result), -) error { - availableStake, err := staker.Stake() - if err != nil { - return err - } - - minimumStake, err := relayChain.MinimumStake() - if err != nil { - return err - } - - tickets, err := generateTickets( - newEntry.Bytes(), - staker.Address(), - availableStake, - minimumStake, - ) - if err != nil { - return err - } - - logger.Infof("starting ticket submission with [%v] tickets", len(tickets)) - - err = submitTickets( - tickets, - relayChain, - blockCounter, - chainConfig, - startBlockHeight, - ) - if err != nil { - logger.Errorf("ticket submission terminated with error: [%v]", err) - } - - // Wait till the end of the ticket submission in case submitTickets failed - // in the middle and there is still a chance we qualified to a group. - ticketSubmissionTimeoutChannel, err := blockCounter.BlockHeightWaiter( - startBlockHeight + chainConfig.TicketSubmissionTimeout, - ) - if err != nil { - return err - } - - ticketSubmissionEndBlockHeight := <-ticketSubmissionTimeoutChannel - - logger.Infof( - "ticket submission ended at block [%v]", - ticketSubmissionEndBlockHeight, - ) - - selectedStakers, err := relayChain.GetSelectedParticipants() - if err != nil { - return fmt.Errorf( - "could not fetch selected participants "+ - "after submission timeout [%v]", - err, - ) - } - - go onGroupSelected(&Result{ - SelectedStakers: selectedStakers, - GroupSelectionEndBlock: ticketSubmissionEndBlockHeight, - }) - - return nil -} - -func submitTickets( - tickets []*ticket, - relayChain relaychain.GroupSelectionInterface, - blockCounter chain.BlockCounter, - chainConfig *relaychain.Config, - startBlockHeight uint64, -) error { - rounds, err := calculateRoundsCount(chainConfig.TicketSubmissionTimeout) - if err != nil { - return err - } - - for roundIndex := uint64(0); roundIndex <= rounds; roundIndex++ { - roundStartDelay := roundIndex * roundDuration - roundStartBlock := startBlockHeight + roundStartDelay - roundLeadingZeros := rounds - roundIndex - - logger.Infof( - "ticket submission round [%v] will start at "+ - "block [%v] and cover tickets with [%v] leading zeros", - roundIndex, - roundStartBlock, - roundLeadingZeros, - ) - - err := blockCounter.WaitForBlockHeight(roundStartBlock) - if err != nil { - return err - } - - candidateTickets, err := roundCandidateTickets( - relayChain, - tickets, - roundIndex, - roundLeadingZeros, - chainConfig.GroupSize, - ) - if err != nil { - return err - } - - logger.Infof( - "ticket submission round [%v] submitting "+ - "[%v] tickets", - roundIndex, - len(candidateTickets), - ) - - submitTicketsOnChain(candidateTickets, relayChain) - } - - return nil -} - -// calculateRoundsCount takes the on-chain ticket submission timeout -// and calculates the number of rounds for ticket submission. If it is not -// possible to use the recommended round duration and mining lag because the -// supplied timeout is too short, function returns an error. -func calculateRoundsCount(submissionTimeout uint64) (uint64, error) { - if submissionTimeout-miningLag <= roundDuration { - return 0, fmt.Errorf("submission timeout is too short") - } - - return (submissionTimeout - miningLag) / roundDuration, nil -} - -// roundCandidateTickets returns tickets which should be submitted in -// the given ticket submission round. -// -// Bear in mind that member tickets slice should be sorted in ascending -// order by their value. -func roundCandidateTickets( - relayChain relaychain.GroupSelectionInterface, - memberTickets []*ticket, - roundIndex uint64, - roundLeadingZeros uint64, - groupSize int, -) ([]*ticket, error) { - // Get unsorted submitted tickets from the chain. - // This slice will be also filled by candidate tickets values in order to - // compare subsequent member ticket values against all submitted tickets - // so far and determine an optimal number of candidate tickets. - submittedTickets, err := relayChain.GetSubmittedTickets() - if err != nil { - return nil, fmt.Errorf( - "could not get submitted tickets: [%v]", - err, - ) - } - - candidateTickets := make([]*ticket, 0) - - for _, candidateTicket := range memberTickets { - candidateTicketLeadingZeros := uint64( - candidateTicket.leadingZeros(), - ) - - // Check if the given candidate ticket should be proceeded in - // the current round. - if roundIndex == 0 { - if candidateTicketLeadingZeros < roundLeadingZeros { - continue - } - } else { - if candidateTicketLeadingZeros != roundLeadingZeros { - continue - } - } - - // Sort submitted tickets slice in ascending order. - sort.SliceStable( - submittedTickets, - func(i, j int) bool { - return submittedTickets[i] < submittedTickets[j] - }, - ) - - shouldBeSubmitted := false - candidateTicketValue := candidateTicket.intValue().Uint64() - - if len(submittedTickets) < groupSize { - // If the submitted tickets count is less than the group - // size the candidate ticket can be added unconditionally. - submittedTickets = append( - submittedTickets, - candidateTicketValue, - ) - shouldBeSubmitted = true - } else { - // If the submitted tickets count is equal to the group - // size the candidate ticket can be added only if - // it is smaller than the highest submitted ticket. - // Note that, maximum length of submitted tickets slice - // will be exceeded and will be trimmed in next - // iteration. - highestSubmittedTicket := submittedTickets[len(submittedTickets)-1] - if candidateTicketValue < highestSubmittedTicket { - submittedTickets[len(submittedTickets)-1] = candidateTicketValue - shouldBeSubmitted = true - } - } - - // If current candidate ticket should not be submitted, - // there is no sense to continue with next candidate tickets - // because they will have higher value than the current one. - if !shouldBeSubmitted { - break - } - - candidateTickets = append( - candidateTickets, - candidateTicket, - ) - } - - return candidateTickets, nil -} diff --git a/pkg/beacon/relay/groupselection/groupselection_test.go b/pkg/beacon/relay/groupselection/groupselection_test.go deleted file mode 100644 index 6d70cbba69..0000000000 --- a/pkg/beacon/relay/groupselection/groupselection_test.go +++ /dev/null @@ -1,372 +0,0 @@ -package groupselection - -import ( - "encoding/binary" - "math/big" - "reflect" - "sort" - "testing" - - "github.com/keep-network/keep-core/pkg/beacon/relay/chain" - "github.com/keep-network/keep-core/pkg/beacon/relay/event" - "github.com/keep-network/keep-core/pkg/chain/local" - "github.com/keep-network/keep-core/pkg/gen/async" - "github.com/keep-network/keep-core/pkg/subscription" -) - -func TestSubmitTickets(t *testing.T) { - var tests = map[string]struct { - groupSize int - tickets []*ticket - expectedSubmittedTickets []uint64 - }{ - // Client has the same number of tickets as the group size. - // All tickets should be submitted to the chain. - "the same number of tickets as group size": { - groupSize: 4, - tickets: []*ticket{ - newTestTicket(1, 1001), - newTestTicket(2, 1002), - newTestTicket(3, 1003), - newTestTicket(4, 1004), - }, - expectedSubmittedTickets: []uint64{1001, 1002, 1003, 1004}, - }, - // Client has more tickets than the group size. - // Only #group_size of tickets should be submitted to the chain. - "more tickets than group size": { - groupSize: 2, - tickets: []*ticket{ - newTestTicket(1, 1001), - newTestTicket(2, 1002), - newTestTicket(3, 1003), - newTestTicket(4, 1004), - }, - expectedSubmittedTickets: []uint64{1001, 1002}, - }, - // Client has less tickets than the group size. - // All tickets should be submitted to the chain. - "less tickets than the group size": { - groupSize: 5, - tickets: []*ticket{ - newTestTicket(1, 1001), - newTestTicket(2, 1002), - }, - expectedSubmittedTickets: []uint64{1001, 1002}, - }, - } - - for testName, test := range tests { - t.Run(testName, func(t *testing.T) { - chainConfig := &chain.Config{ - GroupSize: test.groupSize, - TicketSubmissionTimeout: 24, - } - - chain := &stubGroupInterface{ - groupSize: test.groupSize, - } - - blockCounter, err := local.BlockCounter() - if err != nil { - t.Fatal(err) - } - - err = submitTickets( - test.tickets, - chain, - blockCounter, - chainConfig, - 0, // start block height - ) - if err != nil { - t.Fatal(err) - } - - err = blockCounter.WaitForBlockHeight( - chainConfig.TicketSubmissionTimeout, - ) - if err != nil { - t.Fatal(err) - } - - submittedTickets, err := chain.GetSubmittedTickets() - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(test.expectedSubmittedTickets, submittedTickets) { - t.Fatalf( - "unexpected submitted tickets\nexpected: [%v]\nactual: [%v]", - test.expectedSubmittedTickets, - submittedTickets, - ) - } - }) - } -} - -func TestRoundCandidateTickets(t *testing.T) { - groupSize := 9 - rounds := uint64(7) - - tickets := []*ticket{ - newTestTicket(1, 36028797018963968), - newTestTicket(2, 72057594037927936), - newTestTicket(3, 144115188075855872), - newTestTicket(4, 288230376151711744), - newTestTicket(5, 576460752303423488), - newTestTicket(6, 1152921504606846976), - newTestTicket(7, 2305843009213693952), - newTestTicket(8, 4611686018427387904), - newTestTicket(9, 9223372036854775808), - } - - var tests = map[string]struct { - existingChainTickets []uint64 - expectedCandidateTicketsPerRound map[uint64][]*ticket - }{ - "no existing chain tickets - all tickets should be submitted": { - existingChainTickets: []uint64{}, - expectedCandidateTicketsPerRound: map[uint64][]*ticket{ - 0: {tickets[0], tickets[1]}, - 1: {tickets[2]}, - 2: {tickets[3]}, - 3: {tickets[4]}, - 4: {tickets[5]}, - 5: {tickets[6]}, - 6: {tickets[7]}, - 7: {tickets[8]}, - }, - }, - "better chain tickets exists and their count is below the group size - " + - "only best tickets should be submitted": { - existingChainTickets: []uint64{1000, 1001, 1002, 1003}, - expectedCandidateTicketsPerRound: map[uint64][]*ticket{ - 0: {tickets[0], tickets[1]}, - 1: {tickets[2]}, - 2: {tickets[3]}, - 3: {tickets[4]}, - 4: {}, - 5: {}, - 6: {}, - 7: {}, - }, - }, - "better chain tickets exists and their count is equal the group size - " + - "no tickets should be submitted": { - existingChainTickets: []uint64{ - 1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, - }, - expectedCandidateTicketsPerRound: map[uint64][]*ticket{ - 0: {}, 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, - }, - }, - "worse chain tickets exists and their count is below the group size - " + - "all tickets should be submitted": { - existingChainTickets: []uint64{ - 9223372036854775809, - 9223372036854775810, - 9223372036854775811, - 9223372036854775812, - }, - expectedCandidateTicketsPerRound: map[uint64][]*ticket{ - 0: {tickets[0], tickets[1]}, - 1: {tickets[2]}, - 2: {tickets[3]}, - 3: {tickets[4]}, - 4: {tickets[5]}, - 5: {tickets[6]}, - 6: {tickets[7]}, - 7: {tickets[8]}, - }, - }, - "worse chain tickets exists and their count is equal the group size - " + - "all tickets should be submitted": { - existingChainTickets: []uint64{ - 9223372036854775809, - 9223372036854775810, - 9223372036854775811, - 9223372036854775812, - 9223372036854775813, - 9223372036854775814, - 9223372036854775815, - 9223372036854775816, - 9223372036854775817, - }, - expectedCandidateTicketsPerRound: map[uint64][]*ticket{ - 0: {tickets[0], tickets[1]}, - 1: {tickets[2]}, - 2: {tickets[3]}, - 3: {tickets[4]}, - 4: {tickets[5]}, - 5: {tickets[6]}, - 6: {tickets[7]}, - 7: {tickets[8]}, - }, - }, - "better and worse chain tickets exists and their count is below the group size - " + - "only best tickets should be submitted": { - existingChainTickets: []uint64{ - 1000, - 1001, - 1002, - 9223372036854775809, - 9223372036854775810, - 9223372036854775811, - }, - expectedCandidateTicketsPerRound: map[uint64][]*ticket{ - 0: {tickets[0], tickets[1]}, - 1: {tickets[2]}, - 2: {tickets[3]}, - 3: {tickets[4]}, - 4: {tickets[5]}, - 5: {}, - 6: {}, - 7: {}, - }, - }, - "better and worse chain tickets exists and their count is equal the group size - " + - "only best tickets should be submitted": { - existingChainTickets: []uint64{ - 1000, - 1001, - 1002, - 1003, - 9223372036854775809, - 9223372036854775810, - 9223372036854775811, - 9223372036854775812, - 9223372036854775813, - }, - expectedCandidateTicketsPerRound: map[uint64][]*ticket{ - 0: {tickets[0], tickets[1]}, - 1: {tickets[2]}, - 2: {tickets[3]}, - 3: {tickets[4]}, - 4: {}, - 5: {}, - 6: {}, - 7: {}, - }, - }, - } - - for testName, test := range tests { - t.Run(testName, func(t *testing.T) { - existingChainTickets := make([]*chain.Ticket, 0) - for _, existingChainTicket := range test.existingChainTickets { - chainTicket, err := toChainTicket( - newTestTicket(0, existingChainTicket), - ) - if err != nil { - t.Fatal(err) - } - - existingChainTickets = append(existingChainTickets, chainTicket) - } - - relayChain := &stubGroupInterface{ - groupSize: groupSize, - submittedTickets: existingChainTickets, - } - - for roundIndex := uint64(0); roundIndex <= rounds; roundIndex++ { - roundLeadingZeros := rounds - roundIndex - - candidateTickets, err := roundCandidateTickets( - relayChain, - tickets, - roundIndex, - roundLeadingZeros, - groupSize, - ) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual( - test.expectedCandidateTicketsPerRound[roundIndex], - candidateTickets, - ) { - t.Fatalf( - "unexpected candidate tickets for round [%v]\n"+ - "expected: [%v]\nactual: [%v]", - roundIndex, - test.expectedCandidateTicketsPerRound[roundIndex], - candidateTickets, - ) - } - - // Candidate tickets must be submitted because next round - // will get submitted tickets from the mock chain and use - // them to determine an optimal number of their candidate - // tickets. - for _, ticket := range candidateTickets { - chainTicket, err := toChainTicket(ticket) - if err != nil { - t.Fatal(err) - } - - relayChain.SubmitTicket(chainTicket) - } - } - }) - } -} - -type stubGroupInterface struct { - groupSize int - submittedTickets []*chain.Ticket -} - -func (stg *stubGroupInterface) SubmitTicket(ticket *chain.Ticket) *async.EventGroupTicketSubmissionPromise { - promise := &async.EventGroupTicketSubmissionPromise{} - - stg.submittedTickets = append(stg.submittedTickets, ticket) - - sort.SliceStable(stg.submittedTickets, func(i, j int) bool { - // Ticket value bytes are interpreted as a big-endian unsigned integers. - iValue := new(big.Int).SetBytes(stg.submittedTickets[i].Value[:]) - jValue := new(big.Int).SetBytes(stg.submittedTickets[j].Value[:]) - - return iValue.Cmp(jValue) == -1 - }) - - if len(stg.submittedTickets) > stg.groupSize { - stg.submittedTickets = stg.submittedTickets[:stg.groupSize] - } - - _ = promise.Fulfill(&event.GroupTicketSubmission{ - TicketValue: new(big.Int).SetBytes(ticket.Value[:]), - BlockNumber: 222, - }) - - return promise -} - -func (stg *stubGroupInterface) GetSubmittedTickets() ([]uint64, error) { - tickets := make([]uint64, len(stg.submittedTickets)) - - for i := range tickets { - // Ticket bytes should be interpreted as a big-endian unsigned integers. - tickets[i] = binary.BigEndian.Uint64(stg.submittedTickets[i].Value[:]) - } - - return tickets, nil -} - -func (stg *stubGroupInterface) GetSelectedParticipants() ([]chain.StakerAddress, error) { - selected := make([]chain.StakerAddress, stg.groupSize) - for i := 0; i < stg.groupSize; i++ { - selected[i] = []byte("whatever") - } - - return selected, nil -} - -func (stg *stubGroupInterface) OnGroupSelectionStarted( - func(groupSelectionStart *event.GroupSelectionStart), -) subscription.EventSubscription { - panic("not implemented") -} diff --git a/pkg/beacon/relay/groupselection/ticket.go b/pkg/beacon/relay/groupselection/ticket.go deleted file mode 100644 index c102d96a87..0000000000 --- a/pkg/beacon/relay/groupselection/ticket.go +++ /dev/null @@ -1,93 +0,0 @@ -package groupselection - -import ( - "math/big" - "math/bits" - - "fmt" - - "github.com/ethereum/go-ethereum/crypto" - "github.com/keep-network/keep-core/pkg/internal/byteutils" -) - -// ticket is a message containing a pseudorandomly generated value, W_k, which is -// used to determine whether a given virtual staker is eligible for the group P -// (the lowest N tickets will be chosen) and a proof of the validity of the value. -// Ticket value bytes should be interpreted as a big-endian unsigned integer. -type ticket struct { - value [8]byte // W_k - proof *proof // proof_k = Proof(Q_j, vs) -} - -// proof consists of the components needed to construct the ticket's value, and -// also acts as evidence for an accusing challenge against the ticket's value. -type proof struct { - stakerValue []byte // Q_j, a staker-specific value - virtualStakerIndex *big.Int // vs -} - -// newTicket calculates a ticket value and returns the ticket with -// the associated proof. -func newTicket( - beaconOutput []byte, // V_i - stakerValue []byte, // Q_j - virtualStakerIndex *big.Int, // vs -) (*ticket, error) { - value, err := calculateTicketValue(beaconOutput, stakerValue, virtualStakerIndex) - if err != nil { - return nil, fmt.Errorf("ticket value calculation failed [%v]", err) - } - - return &ticket{ - value: value, - proof: &proof{ - stakerValue: stakerValue, - virtualStakerIndex: virtualStakerIndex, - }, - }, nil -} - -// calculateTicketValue generates a shaValue from the previous beacon output, -// the staker-specific value, and the virtual staker index and returns the -// first 8 bytes as ticket value. -func calculateTicketValue( - beaconOutput []byte, - stakerValue []byte, - virtualStakerIndex *big.Int, -) ([8]byte, error) { - var combinedValue []byte - var ticketValue [8]byte - - beaconOutputPadded, err := byteutils.LeftPadTo32Bytes(beaconOutput) - if err != nil { - return ticketValue, fmt.Errorf("cannot pad a becon output, [%v]", err) - } - - stakerValuePadded, err := byteutils.LeftPadTo32Bytes(stakerValue) - if err != nil { - return ticketValue, fmt.Errorf("cannot pad a staker value, [%v]", err) - } - - virtualStakerIndexPadded, err := byteutils.LeftPadTo32Bytes(virtualStakerIndex.Bytes()) - if err != nil { - return ticketValue, fmt.Errorf("cannot pad a virtual staker index, [%v]", err) - } - - combinedValue = append(combinedValue, beaconOutputPadded...) - combinedValue = append(combinedValue, stakerValuePadded...) - combinedValue = append(combinedValue, virtualStakerIndexPadded...) - - copy(ticketValue[:], crypto.Keccak256(combinedValue[:])[:8]) - - return ticketValue, nil -} - -// Returns ticket value as a big integer. -// Ticket value bytes are interpreted as a big-endian unsigned integer. -func (t *ticket) intValue() *big.Int { - return new(big.Int).SetBytes(t.value[:]) -} - -func (t *ticket) leadingZeros() int { - return bits.LeadingZeros64(t.intValue().Uint64()) -} diff --git a/pkg/beacon/relay/groupselection/ticket_generate.go b/pkg/beacon/relay/groupselection/ticket_generate.go deleted file mode 100644 index 9894795581..0000000000 --- a/pkg/beacon/relay/groupselection/ticket_generate.go +++ /dev/null @@ -1,32 +0,0 @@ -package groupselection - -import ( - "math/big" - "sort" -) - -// generateTickets generates a set of tickets for the given staker and relay -// entry value given the specified stake parameters and natural threshold. -// -// Tickets are returned sorted in ascending order by their value. -func generateTickets( - beaconValue []byte, // V_i - stakerValue []byte, // Q_j - availableStake *big.Int, // S_j - minimumStake *big.Int, -) ([]*ticket, error) { - stakingWeight := new(big.Int).Quo(availableStake, minimumStake) // W_j - - tickets := make([]*ticket, 0) - for virtualStaker := int64(1); virtualStaker <= stakingWeight.Int64(); virtualStaker++ { - ticket, err := newTicket(beaconValue, stakerValue, big.NewInt(virtualStaker)) - if err != nil { - return nil, err - } - tickets = append(tickets, ticket) - } - - sort.Stable(byValue(tickets)) - - return tickets, nil -} diff --git a/pkg/beacon/relay/groupselection/ticket_generate_test.go b/pkg/beacon/relay/groupselection/ticket_generate_test.go deleted file mode 100644 index efefadba80..0000000000 --- a/pkg/beacon/relay/groupselection/ticket_generate_test.go +++ /dev/null @@ -1,60 +0,0 @@ -package groupselection - -import ( - "math/big" - "testing" -) - -var stakingAddress = []byte("staking address") -var previousBeaconOutput = []byte("test beacon output") - -func TestAllTicketsGenerated(t *testing.T) { - minimumStake := big.NewInt(20) - availableStake := big.NewInt(1000) - virtualStakers := availableStake.Int64() / minimumStake.Int64() - - tickets, err := generateTickets( - previousBeaconOutput, - stakingAddress, - availableStake, - minimumStake, - ) - if err != nil { - t.Fatal(err) - } - - // We should have 1000/20 = 50 tickets - allTicketsCount := len(tickets) - if allTicketsCount != int(virtualStakers) { - t.Fatalf( - "expected [%d] tickets, has [%d] tickets", - virtualStakers, - allTicketsCount, - ) - } -} - -func TestTicketsGeneratedInOrder(t *testing.T) { - minimumStake := big.NewInt(1) - availableStake := big.NewInt(100) - - tickets, err := generateTickets( - previousBeaconOutput, - stakingAddress, - availableStake, - minimumStake, - ) - if err != nil { - t.Fatal(err) - } - - // Tickets should be sorted in ascending order - for i := 0; i < len(tickets)-1; i++ { - value := tickets[i].intValue() - nextValue := tickets[i+1].intValue() - - if value.Cmp(nextValue) > 0 { - t.Errorf("tickets not sorted in ascending order") - } - } -} diff --git a/pkg/beacon/relay/groupselection/ticket_sort.go b/pkg/beacon/relay/groupselection/ticket_sort.go deleted file mode 100644 index ab2c75ad90..0000000000 --- a/pkg/beacon/relay/groupselection/ticket_sort.go +++ /dev/null @@ -1,19 +0,0 @@ -package groupselection - -// byValue implements sort.Interface sorting tickets by their value. -type byValue []*ticket - -// Len is the sort.Interface requirement for ticket ordering. -func (bv byValue) Len() int { - return len(bv) -} - -// Swap is the sort.Interface requirement for ticket ordering. -func (bv byValue) Swap(i, j int) { - bv[i], bv[j] = bv[j], bv[i] -} - -// Less is the sort.Interface requirement for ticket ordering. -func (bv byValue) Less(i, j int) bool { - return bv[i].intValue().Cmp(bv[j].intValue()) < 1 -} diff --git a/pkg/beacon/relay/groupselection/ticket_sort_test.go b/pkg/beacon/relay/groupselection/ticket_sort_test.go deleted file mode 100644 index 42b9324fd8..0000000000 --- a/pkg/beacon/relay/groupselection/ticket_sort_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package groupselection - -import ( - "encoding/binary" - "math/big" - "reflect" - "sort" - "testing" -) - -func TestSortByValue(t *testing.T) { - ticket1 := newTestTicket(5, 1001) - ticket2 := newTestTicket(4, 1002) - ticket3 := newTestTicket(1, 1003) - ticket4 := newTestTicket(3, 1004) - ticket5 := newTestTicket(2, 1005) - - tickets := []*ticket{ - ticket3, - ticket5, - ticket4, - ticket1, - ticket2, - } - - sort.Stable(byValue(tickets)) - - assertTicketAtIndex(t, tickets, 0, ticket1) - assertTicketAtIndex(t, tickets, 1, ticket2) - assertTicketAtIndex(t, tickets, 2, ticket3) - assertTicketAtIndex(t, tickets, 3, ticket4) - assertTicketAtIndex(t, tickets, 4, ticket5) -} - -func assertTicketAtIndex(t *testing.T, tickets []*ticket, index int, ticket *ticket) { - if !reflect.DeepEqual(ticket, tickets[index]) { - t.Errorf( - "unexpected ticket at index [%v]\nexpected: [%+v]\nactual: [%+v]", - index, - ticket, - tickets[index], - ) - } -} - -func newTestTicket(virtualStakerIndex uint32, value uint64) *ticket { - var bytes [8]byte - binary.BigEndian.PutUint64(bytes[:], value) - - return &ticket{ - value: bytes, - proof: &proof{ - virtualStakerIndex: big.NewInt(int64(virtualStakerIndex)), - }, - } -} diff --git a/pkg/beacon/relay/groupselection/ticket_submission.go b/pkg/beacon/relay/groupselection/ticket_submission.go deleted file mode 100644 index f7c47e8b6b..0000000000 --- a/pkg/beacon/relay/groupselection/ticket_submission.go +++ /dev/null @@ -1,43 +0,0 @@ -package groupselection - -import ( - "math/big" - - relaychain "github.com/keep-network/keep-core/pkg/beacon/relay/chain" -) - -// submitTicketsOnChain submits tickets to the chain. -func submitTicketsOnChain( - tickets []*ticket, - relayChain relaychain.GroupSelectionInterface, -) { - for _, ticket := range tickets { - chainTicket, err := toChainTicket(ticket) - if err != nil { - logger.Errorf( - "could not transform ticket to chain format: [%v]", - err, - ) - continue - } - - relayChain.SubmitTicket(chainTicket).OnFailure( - func(err error) { - logger.Errorf( - "ticket submission failed: [%v]", - err, - ) - }, - ) - } -} - -func toChainTicket(ticket *ticket) (*relaychain.Ticket, error) { - return &relaychain.Ticket{ - Value: ticket.value, - Proof: &relaychain.TicketProof{ - StakerValue: new(big.Int).SetBytes(ticket.proof.stakerValue), - VirtualStakerIndex: ticket.proof.virtualStakerIndex, - }, - }, nil -} diff --git a/pkg/beacon/relay/groupselection/ticket_submission_test.go b/pkg/beacon/relay/groupselection/ticket_submission_test.go deleted file mode 100644 index 7f83b082bc..0000000000 --- a/pkg/beacon/relay/groupselection/ticket_submission_test.go +++ /dev/null @@ -1,98 +0,0 @@ -package groupselection - -import ( - "math/big" - "reflect" - "testing" - - "github.com/keep-network/keep-core/pkg/beacon/relay/chain" - "github.com/keep-network/keep-core/pkg/beacon/relay/event" - "github.com/keep-network/keep-core/pkg/gen/async" - "github.com/keep-network/keep-core/pkg/subscription" -) - -func TestSubmitTicketsOnChain(t *testing.T) { - beaconOutput := big.NewInt(10).Bytes() - stakerValue := []byte("StakerValue1001") - - tickets := make([]*ticket, 0) - for i := 1; i <= 4; i++ { - ticket, _ := newTicket(beaconOutput, stakerValue, big.NewInt(int64(i))) - tickets = append(tickets, ticket) - } - - submittedTickets := make([]*chain.Ticket, 0) - - mockInterface := &mockGroupInterface{ - mockSubmitTicketFn: func(t *chain.Ticket) *async.EventGroupTicketSubmissionPromise { - submittedTickets = append(submittedTickets, t) - promise := &async.EventGroupTicketSubmissionPromise{} - promise.Fulfill(&event.GroupTicketSubmission{ - TicketValue: new(big.Int).SetBytes(t.Value[:]), - BlockNumber: 111, - }) - return promise - }, - } - - submitTicketsOnChain(tickets, mockInterface) - - if len(tickets) != len(submittedTickets) { - t.Errorf( - "unexpected number of tickets submitted\nexpected: [%v]\nactual: [%v]", - len(tickets), - len(submittedTickets), - ) - } - - for i, ticket := range tickets { - submitted := fromChainTicket(submittedTickets[i]) - - if !reflect.DeepEqual(ticket, submitted) { - t.Errorf( - "unexpected ticket at index [%v]\nexpected: [%v]\nactual: [%v]", - i, - ticket, - submitted, - ) - } - } -} - -func fromChainTicket(chainTicket *chain.Ticket) *ticket { - return &ticket{ - value: chainTicket.Value, - proof: &proof{ - stakerValue: chainTicket.Proof.StakerValue.Bytes(), - virtualStakerIndex: chainTicket.Proof.VirtualStakerIndex, - }, - } -} - -type mockGroupInterface struct { - mockSubmitTicketFn func(t *chain.Ticket) *async.EventGroupTicketSubmissionPromise -} - -func (mgi *mockGroupInterface) SubmitTicket( - ticket *chain.Ticket, -) *async.EventGroupTicketSubmissionPromise { - if mgi.mockSubmitTicketFn != nil { - return mgi.mockSubmitTicketFn(ticket) - } - - panic("unexpected") -} - -func (mgi *mockGroupInterface) GetSubmittedTickets() ([]uint64, error) { - panic("not implemented") -} - -func (mgi *mockGroupInterface) GetSelectedParticipants() ([]chain.StakerAddress, error) { - panic("unexpected") -} - -func (mgi *mockGroupInterface) OnGroupSelectionStarted( - func(groupSelectionStart *event.GroupSelectionStart), -) subscription.EventSubscription { - panic("not implemented") -} diff --git a/pkg/beacon/relay/node.go b/pkg/beacon/relay/node.go index 17c0c9d219..5872dd93cc 100644 --- a/pkg/beacon/relay/node.go +++ b/pkg/beacon/relay/node.go @@ -15,7 +15,6 @@ import ( relaychain "github.com/keep-network/keep-core/pkg/beacon/relay/chain" "github.com/keep-network/keep-core/pkg/beacon/relay/dkg" - "github.com/keep-network/keep-core/pkg/beacon/relay/groupselection" "github.com/keep-network/keep-core/pkg/beacon/relay/registry" "github.com/keep-network/keep-core/pkg/chain" "github.com/keep-network/keep-core/pkg/net" @@ -43,43 +42,59 @@ func (n *Node) IsInGroup(groupPublicKey []byte) bool { return len(n.groupRegistry.GetGroup(groupPublicKey)) > 0 } -// JoinGroupIfEligible takes a threshold relay entry value and undergoes the -// process of joining a group if this node's virtual stakers prove eligible for -// the group generated by that entry. This is an interactive on-chain process, -// and JoinGroupIfEligible can block for an extended period of time while it +// JoinDKGIfEligible takes a seed value and undergoes the process of the +// distributed key generation if this node's operator proves to be eligible for +// the group generated by that seed. This is an interactive on-chain process, +// and JoinDKGIfEligible can block for an extended period of time while it // completes the on-chain operation. -// -// Indirectly, the completion of the process is signaled by the formation of an -// on-chain group containing at least one of this node's virtual stakers. -func (n *Node) JoinGroupIfEligible( +func (n *Node) JoinDKGIfEligible( relayChain relaychain.Interface, signing chain.Signing, - groupSelectionResult *groupselection.Result, - newEntry *big.Int, + dkgSeed *big.Int, + dkgStartBlockNumber uint64, ) { - dkgStartBlockHeight := groupSelectionResult.GroupSelectionEndBlock + logger.Infof( + "checking eligibility for DKG with seed [0x%x]", + dkgSeed, + ) - if len(groupSelectionResult.SelectedStakers) > maxGroupSize { + groupMembers, err := relayChain.SelectGroup(dkgSeed) + if err != nil { + logger.Errorf( + "failed to select group with seed [0x%x]: [%v]", + dkgSeed, + err, + ) + return + } + + if len(groupMembers) > maxGroupSize { logger.Errorf( "group size larger than supported: [%v]", - len(groupSelectionResult.SelectedStakers), + len(groupMembers), ) return } indexes := make([]uint8, 0) - for index, selectedStaker := range groupSelectionResult.SelectedStakers { + for index, groupMember := range groupMembers { // See if we are amongst those chosen - if bytes.Compare(selectedStaker, n.Staker.Address()) == 0 { + if bytes.Compare(groupMember, n.Staker.Address()) == 0 { indexes = append(indexes, uint8(index)) } } // create temporary broadcast channel name for DKG using the // group selection seed - channelName := newEntry.Text(16) + channelName := dkgSeed.Text(16) if len(indexes) > 0 { + logger.Infof( + "joining DKG with seed [0x%x] and controlling [%v] group members", + dkgSeed, + len(indexes), + ) + broadcastChannel, err := n.netProvider.BroadcastChannelFor(channelName) if err != nil { logger.Errorf("failed to get broadcast channel: [%v]", err) @@ -87,7 +102,7 @@ func (n *Node) JoinGroupIfEligible( } membershipValidator := group.NewStakersMembershipValidator( - groupSelectionResult.SelectedStakers, + groupMembers, signing, ) @@ -101,17 +116,17 @@ func (n *Node) JoinGroupIfEligible( } for _, index := range indexes { - // capture player index for goroutine + // Capture the player index for the goroutine. playerIndex := index go func() { signer, err := dkg.ExecuteDKG( - newEntry, + dkgSeed, playerIndex, n.chainConfig.GroupSize, n.chainConfig.DishonestThreshold(), membershipValidator, - dkgStartBlockHeight, + dkgStartBlockNumber, n.blockCounter, relayChain, signing, @@ -122,23 +137,31 @@ func (n *Node) JoinGroupIfEligible( return } - // final broadcast channel name for group is the compressed - // public key of the group - channelName := hex.EncodeToString( + groupPublicKey := hex.EncodeToString( signer.GroupPublicKeyBytesCompressed(), ) - err = n.groupRegistry.RegisterGroup(signer, channelName) + // TODO: Consider snapshotting the key material just in case. + err = n.groupRegistry.RegisterGroup(signer, groupPublicKey) if err != nil { - logger.Errorf("failed to register a group: [%v]", err) + logger.Errorf( + "[member:%v] failed to register a group [%v]: [%v]", + signer.MemberID(), + groupPublicKey, + err, + ) + return } logger.Infof( - "[member:%v] ready to operate in the group", + "[member:%v] group [%v] registered successfully", signer.MemberID(), + groupPublicKey, ) }() } + } else { + logger.Infof("not eligible for DKG with seed [0x%x]", dkgSeed) } return diff --git a/pkg/beacon/relay/registry/groups_test.go b/pkg/beacon/relay/registry/groups_test.go index 3792ad9fe8..ff61ed461b 100644 --- a/pkg/beacon/relay/registry/groups_test.go +++ b/pkg/beacon/relay/registry/groups_test.go @@ -118,7 +118,7 @@ func TestLoadGroup(t *testing.T) { func TestUnregisterStaleGroups(t *testing.T) { mockChain := &mockGroupRegistrationInterface{ - groupsToRemove: [][]byte{}, + groupsToRemove: [][]byte{}, groupsCheckedIfStale: make(map[string]bool), } @@ -197,6 +197,12 @@ func (mgri *mockGroupRegistrationInterface) OnGroupRegistered( panic("not implemented") } +func (mgri *mockGroupRegistrationInterface) IsGroupRegistered( + groupPublicKey []byte, +) (bool, error) { + panic("not implemented") +} + func (mgri *mockGroupRegistrationInterface) IsStaleGroup(groupPublicKey []byte) (bool, error) { mgri.groupsCheckedIfStale[groupKeyToString(groupPublicKey)] = true for _, groupToRemove := range mgri.groupsToRemove { diff --git a/pkg/chain/ethereum/connect.go b/pkg/chain/ethereum/connect.go index 3918717df6..7c39642b6e 100644 --- a/pkg/chain/ethereum/connect.go +++ b/pkg/chain/ethereum/connect.go @@ -3,6 +3,7 @@ package ethereum import ( "context" "fmt" + "github.com/keep-network/keep-core/pkg/beacon/relay/event" "math/big" "sync" @@ -53,6 +54,11 @@ type ethereumChain struct { // nonce. Serializing submission ensures that each nonce is requested after // a previous transaction has been submitted. transactionMutex *sync.Mutex + + // TODO: Temporary helper map. Should be removed once proper RandomBeacon + // v2 implementation is here. + dkgResultSubmissionHandlersMutex sync.Mutex + dkgResultSubmissionHandlers map[int]func(submission *event.DKGResultSubmission) } type ethereumUtilityChain struct { @@ -93,12 +99,13 @@ func connectWithClient( } ec := ðereumChain{ - config: config, - client: addClientWrappers(config, client), - clientRPC: clientRPC, - clientWS: clientWS, - chainID: chainID, - transactionMutex: &sync.Mutex{}, + config: config, + client: addClientWrappers(config, client), + clientRPC: clientRPC, + clientWS: clientWS, + chainID: chainID, + transactionMutex: &sync.Mutex{}, + dkgResultSubmissionHandlers: make(map[int]func(submission *event.DKGResultSubmission)), } blockCounter, err := ethutil.NewBlockCounter(ec.client) @@ -180,7 +187,9 @@ func connectWithClient( } ec.chainConfig = chainConfig - ec.initializeBalanceMonitoring(ctx) + // TODO: Disable balance monitoring to be able to start the client + // without v1 contracts deployed. + // ec.initializeBalanceMonitoring(ctx) return ec, nil } @@ -298,43 +307,16 @@ func (ec *ethereumChain) BlockCounter() (chain.BlockCounter, error) { func fetchChainConfig(ec *ethereumChain) (*relaychain.Config, error) { logger.Infof("fetching relay chain config") - groupSize, err := ec.keepRandomBeaconOperatorContract.GroupSize() - if err != nil { - return nil, fmt.Errorf("error calling GroupSize: [%v]", err) - } - - threshold, err := ec.keepRandomBeaconOperatorContract.GroupThreshold() - if err != nil { - return nil, fmt.Errorf("error calling GroupThreshold: [%v]", err) - } - - ticketSubmissionTimeout, err := - ec.keepRandomBeaconOperatorContract.TicketSubmissionTimeout() - if err != nil { - return nil, fmt.Errorf( - "error calling TicketSubmissionTimeout: [%v]", - err, - ) - } - - resultPublicationBlockStep, err := ec.keepRandomBeaconOperatorContract.ResultPublicationBlockStep() - if err != nil { - return nil, fmt.Errorf( - "error calling ResultPublicationBlockStep: [%v]", - err, - ) - } - - relayEntryTimeout, err := ec.keepRandomBeaconOperatorContract.RelayEntryTimeout() - if err != nil { - return nil, fmt.Errorf("error calling RelayEntryTimeout: [%v]", err) - } + // TODO: Fetch from RandomBeacon v2 contract. + groupSize := 64 + honestThreshold := 33 + resultPublicationBlockStep := 6 + relayEntryTimeout := groupSize * resultPublicationBlockStep return &relaychain.Config{ - GroupSize: int(groupSize.Int64()), - HonestThreshold: int(threshold.Int64()), - TicketSubmissionTimeout: ticketSubmissionTimeout.Uint64(), - ResultPublicationBlockStep: resultPublicationBlockStep.Uint64(), - RelayEntryTimeout: relayEntryTimeout.Uint64(), + GroupSize: groupSize, + HonestThreshold: honestThreshold, + ResultPublicationBlockStep: uint64(resultPublicationBlockStep), + RelayEntryTimeout: uint64(relayEntryTimeout), }, nil } diff --git a/pkg/chain/ethereum/ethereum.go b/pkg/chain/ethereum/ethereum.go index c467746ee1..c214224bc4 100644 --- a/pkg/chain/ethereum/ethereum.go +++ b/pkg/chain/ethereum/ethereum.go @@ -1,9 +1,11 @@ package ethereum import ( + "context" + "encoding/binary" "fmt" "math/big" - "time" + "math/rand" "github.com/ipfs/go-log" @@ -62,98 +64,6 @@ func (ec *ethereumChain) HasMinimumStake(address common.Address) (bool, error) { return ec.keepRandomBeaconOperatorContract.HasMinimumStake(address) } -func (ec *ethereumChain) SubmitTicket(ticket *relayChain.Ticket) *async.EventGroupTicketSubmissionPromise { - submittedTicketPromise := &async.EventGroupTicketSubmissionPromise{} - - failPromise := func(err error) { - failErr := submittedTicketPromise.Fail(err) - if failErr != nil { - logger.Errorf( - "failing promise because of: [%v] failed with: [%v].", - err, - failErr, - ) - } - } - - ticketBytes := ec.packTicket(ticket) - - _, err := ec.keepRandomBeaconOperatorContract.SubmitTicket( - ticketBytes, - ethutil.TransactionOptions{ - GasLimit: 250000, - }, - ) - if err != nil { - failPromise(err) - } - - // TODO: fulfill when submitted - - return submittedTicketPromise -} - -func (ec *ethereumChain) packTicket(ticket *relayChain.Ticket) [32]uint8 { - ticketBytes := []uint8{} - ticketBytes = append(ticketBytes, ticket.Value[:]...) - ticketBytes = append(ticketBytes, common.LeftPadBytes(ticket.Proof.StakerValue.Bytes(), 20)[0:20]...) - ticketBytes = append(ticketBytes, common.LeftPadBytes(ticket.Proof.VirtualStakerIndex.Bytes(), 4)[0:4]...) - - ticketFixedArray := [32]uint8{} - copy(ticketFixedArray[:], ticketBytes[:32]) - - return ticketFixedArray -} - -func (ec *ethereumChain) GetSubmittedTickets() ([]uint64, error) { - return ec.keepRandomBeaconOperatorContract.SubmittedTickets() -} - -func (ec *ethereumChain) GetSelectedParticipants() ([]relayChain.StakerAddress, error) { - var stakerAddresses []relayChain.StakerAddress - fetchParticipants := func() error { - participants, err := ec.keepRandomBeaconOperatorContract.SelectedParticipants() - if err != nil { - return err - } - - stakerAddresses = make([]relayChain.StakerAddress, len(participants)) - for i, participant := range participants { - stakerAddresses[i] = participant.Bytes() - } - - return nil - } - - // The reason behind a retry functionality is Infura's load balancer synchronization - // problem. Whenever a Keep client is connected to Infura, it might experience - // a slight delay with block updates between ethereum clients. One or more - // clients might stay behind and report a block number 'n-1', whereas the - // actual block number is already 'n'. This delay results in error triggering - // a new group selection. To mitigate Infura's sync issue, a Keep client will - // retry calling for selected participants up to 4 times. - // Synchronization issue can occur on any setup where we have more than one - // Ethereum clients behind a load balancer. - const numberOfRetries = 10 - const delay = time.Second - - for i := 1; ; i++ { - err := fetchParticipants() - if err != nil { - if i == numberOfRetries { - return nil, err - } - time.Sleep(delay) - logger.Infof( - "Retrying getting selected participants; attempt [%v]", - i, - ) - } else { - return stakerAddresses, nil - } - } -} - func (ec *ethereumChain) SubmitRelayEntry( entry []byte, ) *async.EventEntrySubmittedPromise { @@ -263,24 +173,18 @@ func (ec *ethereumChain) OnRelayEntryRequested( return subscription } -func (ec *ethereumChain) OnGroupSelectionStarted( - handle func(groupSelectionStart *event.GroupSelectionStart), -) subscription.EventSubscription { - onEvent := func( - newEntry *big.Int, - blockNumber uint64, - ) { - handle(&event.GroupSelectionStart{ - NewEntry: newEntry, - BlockNumber: blockNumber, - }) - } +// TODO: Implement a real SelectGroup function once it is possible on the +// contract side. The current implementation just return a group +// where all members belong to the chain operator. +func (ec *ethereumChain) SelectGroup(seed *big.Int) ([]relayChain.StakerAddress, error) { + groupSize := ec.GetConfig().GroupSize + groupMembers := make([]relayChain.StakerAddress, groupSize) - subscription := ec.keepRandomBeaconOperatorContract.GroupSelectionStarted( - nil, - ).OnEvent(onEvent) + for index := range groupMembers { + groupMembers[index] = ec.accountKey.Address.Bytes() + } - return subscription + return groupMembers, nil } func (ec *ethereumChain) OnGroupRegistered( @@ -305,8 +209,9 @@ func (ec *ethereumChain) OnGroupRegistered( return subscription } +// TODO: Implement for RandomBeacon v2. func (ec *ethereumChain) IsGroupRegistered(groupPublicKey []byte) (bool, error) { - return ec.keepRandomBeaconOperatorContract.IsGroupRegistered(groupPublicKey) + return false, nil } func (ec *ethereumChain) IsStaleGroup(groupPublicKey []byte) (bool, error) { @@ -332,28 +237,67 @@ func (ec *ethereumChain) GetGroupMembers(groupPublicKey []byte) ( return stakerAddresses, nil } +// TODO: Implement a real DkgStarted event subscription once it is possible +// on the contract side. The current implementation generate a fake +// event every 500th block where the seed is the keccak256 of the +// block number. Remember about disabling the subscription +// monitoring loop for the DkgStarted event. +func (ec *ethereumChain) OnDKGStarted( + handler func(event *event.DKGStarted), +) subscription.EventSubscription { + ctx, cancelCtx := context.WithCancel(context.Background()) + blocksChan := ec.blockCounter.WatchBlocks(ctx) + + go func() { + for { + select { + case block := <-blocksChan: + // Generate an event every 500th block. + if block%500 == 0 { + // The seed is keccak256(block). + blockBytes := make([]byte, 8) + binary.BigEndian.PutUint64(blockBytes, block) + seedBytes := crypto.Keccak256(blockBytes) + seed := new(big.Int).SetBytes(seedBytes) + + handler(&event.DKGStarted{ + Seed: seed, + BlockNumber: block, + }) + } + case <-ctx.Done(): + return + } + } + }() + + return subscription.NewEventSubscription(func() { + cancelCtx() + }) +} + +// TODO: Implement a real OnDKGResultSubmitted event subscription once it is +// possible on the contract side. The current implementation just pipes +// the DKG submission event generated within SubmitDKGResult to the +// handlers registered in the dkgResultSubmissionHandlers map. func (ec *ethereumChain) OnDKGResultSubmitted( handler func(dkgResultPublication *event.DKGResultSubmission), ) subscription.EventSubscription { - onEvent := func( - memberIndex *big.Int, - groupPublicKey []byte, - misbehaved []byte, - blockNumber uint64, - ) { - handler(&event.DKGResultSubmission{ - MemberIndex: uint32(memberIndex.Uint64()), - GroupPublicKey: groupPublicKey, - Misbehaved: misbehaved, - BlockNumber: blockNumber, - }) - } + ec.dkgResultSubmissionHandlersMutex.Lock() + defer ec.dkgResultSubmissionHandlersMutex.Unlock() - subscription := ec.keepRandomBeaconOperatorContract.DkgResultSubmittedEvent( - nil, - ).OnEvent(onEvent) + // #nosec G404 (insecure random number source (rand)) + // Temporary test implementation doesn't require secure randomness. + handlerID := rand.Int() - return subscription + ec.dkgResultSubmissionHandlers[handlerID] = handler + + return subscription.NewEventSubscription(func() { + ec.dkgResultSubmissionHandlersMutex.Lock() + defer ec.dkgResultSubmissionHandlersMutex.Unlock() + + delete(ec.dkgResultSubmissionHandlers, handlerID) + }) } func (ec *ethereumChain) ReportRelayEntryTimeout() error { @@ -386,6 +330,11 @@ func (ec *ethereumChain) CurrentRequestGroupPublicKey() ([]byte, error) { return ec.keepRandomBeaconOperatorContract.GetGroupPublicKey(currentRequestGroupIndex) } +// TODO: Implement a real SubmitDKGResult action once it is possible on the +// contract side. The current implementation just creates and pipes +// the DKG submission event to the handlers registered in the +// dkgResultSubmissionHandlers map. Consider getting rid of the result +// promise in favor of the fire-and-forget style. func (ec *ethereumChain) SubmitDKGResult( participantIndex relayChain.GroupMemberIndex, result *relayChain.DKGResult, @@ -438,24 +387,26 @@ func (ec *ethereumChain) SubmitDKGResult( } }() - membersIndicesOnChainFormat, signaturesOnChainFormat, err := - convertSignaturesToChainFormat(signatures) + ec.dkgResultSubmissionHandlersMutex.Lock() + defer ec.dkgResultSubmissionHandlersMutex.Unlock() + + blockNumber, err := ec.blockCounter.CurrentBlock() if err != nil { close(publishedResult) - failPromise(fmt.Errorf("converting signatures failed [%v]", err)) + subscription.Unsubscribe() + failPromise(err) return resultPublicationPromise } - if _, err = ec.keepRandomBeaconOperatorContract.SubmitDkgResult( - big.NewInt(int64(participantIndex)), - result.GroupPublicKey, - result.Misbehaved, - signaturesOnChainFormat, - membersIndicesOnChainFormat, - ); err != nil { - subscription.Unsubscribe() - close(publishedResult) - failPromise(err) + for _, handler := range ec.dkgResultSubmissionHandlers { + go func(handler func(*event.DKGResultSubmission)) { + handler(&event.DKGResultSubmission{ + MemberIndex: uint32(participantIndex), + GroupPublicKey: result.GroupPublicKey, + Misbehaved: result.Misbehaved, + BlockNumber: blockNumber, + }) + }(handler) } return resultPublicationPromise diff --git a/pkg/chain/ethereum/ethereum_test.go b/pkg/chain/ethereum/ethereum_test.go index 57d38cf551..ac1eb9da05 100644 --- a/pkg/chain/ethereum/ethereum_test.go +++ b/pkg/chain/ethereum/ethereum_test.go @@ -2,9 +2,7 @@ package ethereum import ( "bytes" - "encoding/hex" "fmt" - "math/big" "reflect" "testing" @@ -148,89 +146,3 @@ func TestConvertSignaturesToChainFormat(t *testing.T) { }) } } - -func TestPackTicket(t *testing.T) { - chain := ðereumChain{} - toBigInt := func(number string) *big.Int { - bigInt, _ := new(big.Int).SetString(number, 10) - return bigInt - } - - toBigIntFromAddress := func(address string) *big.Int { - return new(big.Int).SetBytes(common.HexToAddress(address).Bytes()) - } - - var tests = map[string]struct { - ticketValue [8]byte - stakerValue *big.Int - virtualStakerIndex *big.Int - expectedPacked string - }{ - "virtual staker index minimum value": { - ticketValue: [8]byte{255, 255, 255, 255, 255, 255, 255, 255}, - stakerValue: toBigInt("471938313681866282067432403796053736964016932944"), - virtualStakerIndex: toBigInt("1"), - expectedPacked: "ffffffffffffffff52aa72262c904281c49765499f85a774c459885000000001", - }, - "virtual staker index maximum value": { - ticketValue: [8]byte{255, 255, 255, 255, 255, 255, 255, 255}, - stakerValue: toBigInt("471938313681866282067432403796053736964016932944"), - virtualStakerIndex: toBigInt("4294967295"), - expectedPacked: "ffffffffffffffff52aa72262c904281c49765499f85a774c4598850ffffffff", - }, - "zero ticket value": { - ticketValue: [8]byte{0, 0, 0, 0, 0, 0, 0, 0}, - stakerValue: toBigInt("640134992772870476466797915370027482254406660188"), - virtualStakerIndex: toBigInt("12"), - expectedPacked: "00000000000000007020a5556ba1ce5f92c81063a13d33512cf1305c0000000c", - }, - "low ticket value": { - ticketValue: [8]byte{0, 0, 0, 0, 0, 0, 255, 255}, - stakerValue: toBigInt("640134992772870476466797915370027482254406660188"), - virtualStakerIndex: toBigInt("12"), - expectedPacked: "000000000000ffff7020a5556ba1ce5f92c81063a13d33512cf1305c0000000c", - }, - "staker value is derived from an address without leading zeros": { - ticketValue: [8]byte{255, 255, 255, 255, 255, 255, 255, 255}, - stakerValue: toBigIntFromAddress("0x13b6b8e2cb25f86aa9f3f4eb55ff92684c6efb2d"), - virtualStakerIndex: toBigInt("1"), - expectedPacked: "ffffffffffffffff13b6b8e2cb25f86aa9f3f4eb55ff92684c6efb2d00000001", - }, - "staker value is derived from an address with one leading zero": { - ticketValue: [8]byte{255, 255, 255, 255, 255, 255, 255, 255}, - stakerValue: toBigIntFromAddress("0x017fe79753873f1e87085ab6972715c6c12015e6"), - virtualStakerIndex: toBigInt("1"), - expectedPacked: "ffffffffffffffff017fe79753873f1e87085ab6972715c6c12015e600000001", - }, - "staker value is derived from an address with two leading zeros": { - ticketValue: [8]byte{255, 255, 255, 255, 255, 255, 255, 255}, - stakerValue: toBigIntFromAddress("0x00a1b551e309e0bf36388e549d075222a3197e0c"), - virtualStakerIndex: toBigInt("1"), - expectedPacked: "ffffffffffffffff00a1b551e309e0bf36388e549d075222a3197e0c00000001", - }, - } - - for testName, test := range tests { - t.Run(testName, func(t *testing.T) { - ticket := &relaychain.Ticket{ - Value: test.ticketValue, - Proof: &relaychain.TicketProof{ - StakerValue: test.stakerValue, - VirtualStakerIndex: test.virtualStakerIndex, - }, - } - - actualTicketBytes := chain.packTicket(ticket) - - expectedTicketBytes, _ := hex.DecodeString(test.expectedPacked) - - if !bytes.Equal(expectedTicketBytes, actualTicketBytes[:]) { - t.Errorf( - "\nexpected: %v\nactual: %x\n", - test.expectedPacked, - actualTicketBytes, - ) - } - }) - } -} diff --git a/pkg/chain/local/local.go b/pkg/chain/local/local.go index 17a80b0fce..07c8b4c79d 100644 --- a/pkg/chain/local/local.go +++ b/pkg/chain/local/local.go @@ -2,11 +2,9 @@ package local import ( "bytes" - "encoding/binary" "fmt" "math/big" "math/rand" - "sort" "sync" "github.com/ipfs/go-log" @@ -58,20 +56,17 @@ type localChain struct { lastSubmittedDKGResultSignatures map[relaychain.GroupMemberIndex][]byte lastSubmittedRelayEntry []byte - handlerMutex sync.Mutex - relayEntryHandlers map[int]func(entry *event.EntrySubmitted) - relayRequestHandlers map[int]func(request *event.Request) - groupSelectionStartedHandlers map[int]func(groupSelectionStart *event.GroupSelectionStart) - groupRegisteredHandlers map[int]func(groupRegistration *event.GroupRegistration) - resultSubmissionHandlers map[int]func(submission *event.DKGResultSubmission) + handlerMutex sync.Mutex + relayEntryHandlers map[int]func(entry *event.EntrySubmitted) + relayRequestHandlers map[int]func(request *event.Request) + groupRegisteredHandlers map[int]func(groupRegistration *event.GroupRegistration) + dkgStartedHandlers map[int]func(submission *event.DKGStarted) + resultSubmissionHandlers map[int]func(submission *event.DKGResultSubmission) simulatedHeight uint64 stakeMonitor chain.StakeMonitor blockCounter chain.BlockCounter - tickets []*relaychain.Ticket - ticketsMutex sync.Mutex - relayEntryTimeoutReportsMutex sync.Mutex relayEntryTimeoutReports []uint64 @@ -100,71 +95,7 @@ func (c *localChain) GetConfig() *relaychain.Config { return c.relayConfig } -func (c *localChain) SubmitTicket(ticket *relaychain.Ticket) *async.EventGroupTicketSubmissionPromise { - promise := &async.EventGroupTicketSubmissionPromise{} - - c.ticketsMutex.Lock() - defer c.ticketsMutex.Unlock() - - c.tickets = append(c.tickets, ticket) - sort.SliceStable(c.tickets, func(i, j int) bool { - // Ticket value bytes are interpreted as a big-endian unsigned integers. - iValue := new(big.Int).SetBytes(c.tickets[i].Value[:]) - jValue := new(big.Int).SetBytes(c.tickets[j].Value[:]) - - return iValue.Cmp(jValue) == -1 - }) - - err := promise.Fulfill(&event.GroupTicketSubmission{ - TicketValue: new(big.Int).SetBytes(ticket.Value[:]), - BlockNumber: c.simulatedHeight, - }) - if err != nil { - logger.Errorf("failed to fulfill promise: [%v]", err) - } - - return promise -} - -func (c *localChain) GetSubmittedTickets() ([]uint64, error) { - tickets := make([]uint64, len(c.tickets)) - - for i := range tickets { - tickets[i] = binary.BigEndian.Uint64(c.tickets[i].Value[:]) - } - - return tickets, nil -} - -func (c *localChain) GetSelectedParticipants() ([]relaychain.StakerAddress, error) { - c.ticketsMutex.Lock() - defer c.ticketsMutex.Unlock() - - selectTickets := func() []*relaychain.Ticket { - if len(c.tickets) <= c.relayConfig.GroupSize { - return c.tickets - } - - selectedTickets := make([]*relaychain.Ticket, c.relayConfig.GroupSize) - copy(selectedTickets, c.tickets) - return selectedTickets - } - - selectedTickets := selectTickets() - - selectedParticipants := make([]relaychain.StakerAddress, len(selectedTickets)) - for i, ticket := range selectedTickets { - selectedParticipants[i] = ticket.Proof.StakerValue.Bytes() - } - - return selectedParticipants, nil -} - func (c *localChain) SubmitRelayEntry(newEntry []byte) *async.EventEntrySubmittedPromise { - c.ticketsMutex.Lock() - c.tickets = make([]*relaychain.Ticket, 0) - c.ticketsMutex.Unlock() - relayEntryPromise := &async.EventEntrySubmittedPromise{} currentBlock, err := c.blockCounter.CurrentBlock() @@ -239,21 +170,8 @@ func (c *localChain) OnRelayEntryRequested( }) } -func (c *localChain) OnGroupSelectionStarted( - handler func(entry *event.GroupSelectionStart), -) subscription.EventSubscription { - c.handlerMutex.Lock() - defer c.handlerMutex.Unlock() - - handlerID := generateHandlerID() - c.groupSelectionStartedHandlers[handlerID] = handler - - return subscription.NewEventSubscription(func() { - c.handlerMutex.Lock() - defer c.handlerMutex.Unlock() - - delete(c.groupSelectionStartedHandlers, handlerID) - }) +func (c *localChain) SelectGroup(seed *big.Int) ([]relaychain.StakerAddress, error) { + panic("not implemented") } func (c *localChain) OnGroupRegistered( @@ -315,17 +233,16 @@ func ConnectWithKey( relayConfig: &relaychain.Config{ GroupSize: groupSize, HonestThreshold: honestThreshold, - TicketSubmissionTimeout: 6, ResultPublicationBlockStep: resultPublicationBlockStep, RelayEntryTimeout: resultPublicationBlockStep * uint64(groupSize), }, relayEntryHandlers: make(map[int]func(request *event.EntrySubmitted)), relayRequestHandlers: make(map[int]func(request *event.Request)), groupRegisteredHandlers: make(map[int]func(groupRegistration *event.GroupRegistration)), + dkgStartedHandlers: make(map[int]func(submission *event.DKGStarted)), resultSubmissionHandlers: make(map[int]func(submission *event.DKGResultSubmission)), blockCounter: bc, stakeMonitor: NewStakeMonitor(minimumStake), - tickets: make([]*relaychain.Ticket, 0), groups: []localGroup{group}, operatorPrivateKey: operatorPrivateKey, minimumStake: minimumStake, @@ -457,6 +374,23 @@ func (c *localChain) SubmitDKGResult( return dkgResultPublicationPromise } +func (c *localChain) OnDKGStarted( + handler func(event *event.DKGStarted), +) subscription.EventSubscription { + c.handlerMutex.Lock() + defer c.handlerMutex.Unlock() + + handlerID := generateHandlerID() + c.dkgStartedHandlers[handlerID] = handler + + return subscription.NewEventSubscription(func() { + c.handlerMutex.Lock() + defer c.handlerMutex.Unlock() + + delete(c.dkgStartedHandlers, handlerID) + }) +} + func (c *localChain) OnDKGResultSubmitted( handler func(dkgResultPublication *event.DKGResultSubmission), ) subscription.EventSubscription { diff --git a/pkg/chain/local/local_test.go b/pkg/chain/local/local_test.go index 503b340fbd..83d1937de5 100644 --- a/pkg/chain/local/local_test.go +++ b/pkg/chain/local/local_test.go @@ -14,100 +14,6 @@ import ( relaychain "github.com/keep-network/keep-core/pkg/beacon/relay/chain" ) -func TestSubmitTicketAndGetSelectedParticipants(t *testing.T) { - groupSize := 4 - - generateTicket := func(index int64) *relaychain.Ticket { - var value [8]byte - copy(value[:], common.LeftPadBytes(big.NewInt(10*index).Bytes(), 8)) - - return &relaychain.Ticket{ - Value: value, - Proof: &relaychain.TicketProof{ - StakerValue: big.NewInt(100 * index), - VirtualStakerIndex: big.NewInt(index), - }, - } - } - - ticket1 := generateTicket(1) - ticket2 := generateTicket(2) - ticket3 := generateTicket(3) - ticket4 := generateTicket(4) - ticket5 := generateTicket(5) - ticket6 := generateTicket(6) - - var tests = map[string]struct { - submitTickets func(chain relaychain.Interface) - expectedSelectedTickets []*relaychain.Ticket - }{ - "number of tickets is less than group size": { - submitTickets: func(chain relaychain.Interface) { - chain.SubmitTicket(ticket3) - chain.SubmitTicket(ticket1) - chain.SubmitTicket(ticket2) - }, - expectedSelectedTickets: []*relaychain.Ticket{ - ticket1, ticket2, ticket3, - }, - }, - "number of tickets is same as group size": { - submitTickets: func(chain relaychain.Interface) { - chain.SubmitTicket(ticket3) - chain.SubmitTicket(ticket1) - chain.SubmitTicket(ticket4) - chain.SubmitTicket(ticket2) - }, - expectedSelectedTickets: []*relaychain.Ticket{ - ticket1, ticket2, ticket3, ticket4, - }, - }, - "number of tickets is greater than group size": { - submitTickets: func(chain relaychain.Interface) { - chain.SubmitTicket(ticket3) - chain.SubmitTicket(ticket1) - chain.SubmitTicket(ticket4) - chain.SubmitTicket(ticket6) - chain.SubmitTicket(ticket5) - chain.SubmitTicket(ticket2) - }, - expectedSelectedTickets: []*relaychain.Ticket{ - ticket1, ticket2, ticket3, ticket4, - }, - }, - } - - for testName, test := range tests { - t.Run(testName, func(t *testing.T) { - c := Connect(groupSize, 4, big.NewInt(200)) - chain := c.ThresholdRelay() - - test.submitTickets(chain) - - actualSelectedParticipants, err := chain.GetSelectedParticipants() - if err != nil { - t.Fatal(err) - } - - expectedSelectedParticipants := make( - []relaychain.StakerAddress, - len(test.expectedSelectedTickets), - ) - for i, ticket := range test.expectedSelectedTickets { - expectedSelectedParticipants[i] = ticket.Proof.StakerValue.Bytes() - } - - if !reflect.DeepEqual(expectedSelectedParticipants, actualSelectedParticipants) { - t.Fatalf( - "\nexpected: %v\nactual: %v\n", - expectedSelectedParticipants, - actualSelectedParticipants, - ) - } - }) - } -} - func TestLocalSubmitRelayEntry(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() diff --git a/pkg/chain/staker.go b/pkg/chain/staker.go index 854b3e45f6..ef395d4fa1 100644 --- a/pkg/chain/staker.go +++ b/pkg/chain/staker.go @@ -7,6 +7,9 @@ import ( ) // Staker represents a single staker in the system. +// DEPRECATED +// TODO: The "staker" should probably become "operator" to reflect random +// beacon v2 structure. type Staker interface { // Address returns staker's address Address() relaychain.StakerAddress diff --git a/pkg/gen/async.go b/pkg/gen/async.go index 265780093e..76c2d3b983 100644 --- a/pkg/gen/async.go +++ b/pkg/gen/async.go @@ -1,3 +1,3 @@ package gen -//go:generate sh -c "rm -f ./async/*; go run github.com/keep-network/keep-common/tools/generators/promise/ -d ./async *event.EntrySubmitted *event.GroupTicketSubmission *event.GroupRegistration *event.Request *event.DKGResultSubmission *event.EntryGenerated" +//go:generate sh -c "rm -f ./async/*; go run github.com/keep-network/keep-common/tools/generators/promise/ -d ./async *event.EntrySubmitted *event.GroupRegistration *event.Request *event.DKGResultSubmission *event.EntryGenerated" diff --git a/pkg/gen/async/event_group_ticket_submission_promise.go b/pkg/gen/async/event_group_ticket_submission_promise.go deleted file mode 100644 index 4bcf966137..0000000000 --- a/pkg/gen/async/event_group_ticket_submission_promise.go +++ /dev/null @@ -1,154 +0,0 @@ -// Package async code is auto generated. -package async - -import ( - "fmt" - "sync" - - "github.com/keep-network/keep-core/pkg/beacon/relay/event" -) - -// EventGroupTicketSubmissionPromise represents an eventual completion of an -// ansynchronous operation and its resulting value. Promise can be either -// fulfilled or failed and it can happen only one time. All Promise operations -// are thread-safe. -// -// To create a promise use: `&EventGroupTicketSubmissionPromise{}` -type EventGroupTicketSubmissionPromise struct { - mutex sync.Mutex - successFn func(*event.GroupTicketSubmission) - failureFn func(error) - completeFn func(*event.GroupTicketSubmission, error) - - isComplete bool - value *event.GroupTicketSubmission - err error -} - -// OnSuccess registers a function to be called when the Promise -// has been fulfilled. In case of a failed Promise, function is not -// called at all. OnSuccess is a non-blocking operation. Only one on success -// function can be registered for a Promise. If the Promise has been already -// fulfilled, the function is called immediatelly. -func (p *EventGroupTicketSubmissionPromise) OnSuccess(onSuccess func(*event.GroupTicketSubmission)) *EventGroupTicketSubmissionPromise { - p.mutex.Lock() - defer p.mutex.Unlock() - - p.successFn = onSuccess - - if p.isComplete && p.err == nil { - p.callSuccessFn() - } - - return p -} - -// OnFailure registers a function to be called when the Promise -// execution failed. In case of a fulfilled Promise, function is not -// called at all. OnFailure is a non-blocking operation. Only one on failure -// function can be registered for a Promise. If the Promise has already failed, -// the function is called immediatelly. -func (p *EventGroupTicketSubmissionPromise) OnFailure(onFailure func(error)) *EventGroupTicketSubmissionPromise { - p.mutex.Lock() - defer p.mutex.Unlock() - - p.failureFn = onFailure - - if p.isComplete && p.err != nil { - p.callFailureFn() - } - - return p -} - -// OnComplete registers a function to be called when the Promise -// execution completed no matter if it succeded or failed. -// In case of a successful execution, error passed to the callback -// function is nil. In case of a failed execution, there is no -// value evaluated so the value parameter is nil. OnComplete is -// a non-blocking operation. Only one on complete function can be -// registered for a Promise. If the Promise has already completed, -// the function is called immediatelly. -func (p *EventGroupTicketSubmissionPromise) OnComplete(onComplete func(*event.GroupTicketSubmission, error)) *EventGroupTicketSubmissionPromise { - p.mutex.Lock() - defer p.mutex.Unlock() - - p.completeFn = onComplete - - if p.isComplete { - p.callCompleteFn() - } - - return p -} - -// Fulfill can happen only once for a Promise and it results in calling -// the OnSuccess callback, if registered. If Promise has been already -// completed by either fulfilling or failing, this function reports -// an error. -func (p *EventGroupTicketSubmissionPromise) Fulfill(value *event.GroupTicketSubmission) error { - p.mutex.Lock() - defer p.mutex.Unlock() - - if p.isComplete { - return fmt.Errorf("promise already completed") - } - - p.isComplete = true - p.value = value - - p.callSuccessFn() - p.callCompleteFn() - - return nil -} - -// Fail can happen only once for a Promise and it results in calling -// the OnFailure callback, if registered. If Promise has been already -// completed by either fulfilling or failing, this function reports -// an error. Also, this function reports an error if `err` parameter -// is `nil`. -func (p *EventGroupTicketSubmissionPromise) Fail(err error) error { - p.mutex.Lock() - defer p.mutex.Unlock() - - if err == nil { - return fmt.Errorf("error cannot be nil") - } - - if p.isComplete { - return fmt.Errorf("promise already completed") - } - - p.isComplete = true - p.err = err - - p.callFailureFn() - p.callCompleteFn() - - return nil -} - -func (p *EventGroupTicketSubmissionPromise) callCompleteFn() { - if p.completeFn != nil { - go func() { - p.completeFn(p.value, p.err) - }() - } -} - -func (p *EventGroupTicketSubmissionPromise) callSuccessFn() { - if p.successFn != nil { - go func() { - p.successFn(p.value) - }() - } -} - -func (p *EventGroupTicketSubmissionPromise) callFailureFn() { - if p.failureFn != nil { - go func() { - p.failureFn(p.err) - }() - } -}