Skip to content

Commit

Permalink
Feature/migrate guardian endpoints (#1802)
Browse files Browse the repository at this point in the history
* migrate /v1/governor/available_notional_by_chain

* governor

* adapt grpc controller

* fix db annotations

* fix identation

* migrate /governor/enqueued_vaas

* migrate governor/is_vaa_enqueued

* migrate governor/token_list

* migrate /guardianset/current

* migrate /v1/heartbeats

* adjust endpoint /v1/signed_vaa/:chain_id/:emitter/:seq

* small change
  • Loading branch information
marianososto authored and ftocal committed Oct 21, 2024
1 parent 26087e6 commit 8d27653
Show file tree
Hide file tree
Showing 16 changed files with 369 additions and 73 deletions.
22 changes: 19 additions & 3 deletions api/handlers/governor/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ type GovernorLimit struct {
// AvailableNotionalByChain definition.
// This is the structure that is used in guardian api grpc api version.
type AvailableNotionalByChain struct {
ChainID vaa.ChainID `db:"chainid" json:"chainId"`
AvailableNotional uint64 `db:"availablenotional" json:"remainingAvailableNotional"`
NotionalLimit uint64 `db:"notionallimit" json:"notionalLimit"`
MaxTransactionSize uint64 `db:"maxtransactionsize" json:"bigTransactionSize"`
}

type availableNotionalByChainMongo struct {
ChainID vaa.ChainID `bson:"chainId" json:"chainId"`
AvailableNotional mongo.Uint64 `bson:"availableNotional" json:"remainingAvailableNotional"`
NotionalLimit mongo.Uint64 `bson:"notionalLimit" json:"notionalLimit"`
Expand All @@ -297,13 +304,22 @@ type AvailableNotionalByChain struct {

// TokenList definition
type TokenList struct {
OriginChainID vaa.ChainID `bson:"originchainid" json:"originChainId"`
OriginAddress string `bson:"originaddress" json:"originAddress"`
Price float32 `bson:"price" json:"price"`
OriginChainID vaa.ChainID `db:"originchainid" bson:"originchainid" json:"originChainId"`
OriginAddress string `db:"originaddress" bson:"originaddress" json:"originAddress"`
Price float32 `db:"price" bson:"price" json:"price"`
}

// EnqueuedVaaItem definition
type EnqueuedVaaItem struct {
EmitterChain vaa.ChainID `db:"chainid" json:"emitterChain"`
EmitterAddress string `db:"emitteraddress" json:"emitterAddress"`
Sequence string `db:"sequence" json:"sequence"`
ReleaseTime int64 `db:"releasetime" json:"releaseTime"`
NotionalValue uint64 `db:"notionalvalue" json:"notionalValue"`
TxHash string `db:"txhash" json:"txHash"`
}

type enqueuedVaaItemMongo struct {
EmitterChain vaa.ChainID `bson:"chainid" json:"emitterChain"`
EmitterAddress string `bson:"emitteraddress" json:"emitterAddress"`
Sequence string `bson:"sequence" json:"sequence"`
Expand Down
30 changes: 26 additions & 4 deletions api/handlers/governor/mongo_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,7 +1485,7 @@ func (r *MongoRepository) GetAvailNotionByChain(
}

// decodes to GovernorLimitV2.
var availbleNotional []*AvailableNotionalByChain
var availbleNotional []*availableNotionalByChainMongo
err = cur.All(ctx, &availbleNotional)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
Expand All @@ -1501,7 +1501,17 @@ func (r *MongoRepository) GetAvailNotionByChain(
return nil, errs.ErrNotFound
}

return availbleNotional, nil
var result []*AvailableNotionalByChain
for _, an := range availbleNotional {
result = append(result, &AvailableNotionalByChain{
ChainID: an.ChainID,
NotionalLimit: uint64(an.NotionalLimit),
MaxTransactionSize: uint64(an.MaxTransactionSize),
AvailableNotional: uint64(an.AvailableNotional),
})
}

return result, nil
}

// GetTokenList get token lists.
Expand Down Expand Up @@ -1720,7 +1730,7 @@ func (r *MongoRepository) GetEnqueuedVaas(ctx context.Context) ([]*EnqueuedVaaIt
}

// decodes to []*EnqueuedVaaItem.
var enqueuedVAA []*EnqueuedVaaItem
var enqueuedVAA []*enqueuedVaaItemMongo
err = cur.All(ctx, &enqueuedVAA)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
Expand All @@ -1731,7 +1741,19 @@ func (r *MongoRepository) GetEnqueuedVaas(ctx context.Context) ([]*EnqueuedVaaIt
return nil, errors.WithStack(err)
}

return enqueuedVAA, nil
var resp []*EnqueuedVaaItem
for _, ev := range enqueuedVAA {
resp = append(resp, &EnqueuedVaaItem{
EmitterChain: ev.EmitterChain,
EmitterAddress: ev.EmitterAddress,
NotionalValue: uint64(ev.NotionalValue),
ReleaseTime: ev.ReleaseTime,
Sequence: ev.Sequence,
TxHash: ev.TxHash,
})
}

return resp, nil
}

type EnqueuedResponse struct {
Expand Down
141 changes: 141 additions & 0 deletions api/handlers/governor/postgres_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/common/types"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"sort"
"strconv"
"time"
Expand Down Expand Up @@ -486,6 +488,39 @@ func (r *PostgresRepository) GetAvailableNotional(
return result, err
}

func (r *PostgresRepository) GetAvailNotionByChain(
ctx context.Context,
) ([]*AvailableNotionalByChain, error) {

query := `
WITH gov_status_chain AS (SELECT gov_status_msg.value as status_msg,
gov_config_chains.value as config_chains
FROM wormholescan.wh_governor_status,
wormholescan.wh_governor_config,
jsonb_array_elements(wormholescan.wh_governor_status.message) AS gov_status_msg,
jsonb_array_elements(wormholescan.wh_governor_config.chains) AS gov_config_chains
WHERE wh_governor_config.id = wh_governor_status.id)
SELECT DISTINCT ON ((status_msg ->> 'chainid')::int) (status_msg ->> 'chainid')::int as chainid,
(status_msg ->> 'remainingavailablenotional')::numeric as availablenotional,
(config_chains ->> 'notionallimit')::numeric as notionallimit,
(config_chains ->> 'bigtransactionsize')::numeric as maxtransactionsize
FROM gov_status_chain
WHERE gov_status_chain.status_msg ->> 'chainid' = gov_status_chain.config_chains ->> 'chainid'
ORDER BY (status_msg ->> 'chainid')::int,
(status_msg ->> 'remainingavailablenotional')::numeric;
`

var result []*AvailableNotionalByChain

err := r.db.Select(ctx, &result, query)
if err != nil {
r.logger.Error("failed to execute query", zap.Error(err), zap.String("query", query))
}

return result, err

}

func (r *PostgresRepository) GetAvailableNotionalByChainID(
ctx context.Context,
q *NotionalLimitQuery,
Expand Down Expand Up @@ -810,3 +845,109 @@ func (r *PostgresRepository) GetEnqueueVassByChainID(ctx context.Context, q *Enq

return response, nil
}

func (r *PostgresRepository) GetEnqueuedVaas(ctx context.Context) ([]*EnqueuedVaaItem, error) {
query := `
WITH gov_status_msgs AS (SELECT gov_status_msg.value as status_msg,
(gov_status_msg ->> 'chainid')::smallint as chain_id
FROM wormholescan.wh_governor_status,
jsonb_array_elements(wormholescan.wh_governor_status.message) AS gov_status_msg),
gov_status_enqueuedvaas AS (SELECT chain_id,
emitters -> 'enqueuedvaas' as enqueuedVaas,
emitters ->> 'emitteraddress' as emitter_address
FROM gov_status_msgs,
jsonb_array_elements(gov_status_msgs.status_msg -> 'emitters') as emitters
WHERE emitters ->> 'enqueuedvaas' IS NOT NULL)
SELECT chain_id as chainid,
emitter_address as emitteraddress,
(vaas ->> 'sequence')::bigint as sequence,
(vaas ->> 'releasetime')::bigint as releasetime,
(vaas ->> 'notionalvalue')::numeric as notionalvalue,
vaas ->> 'txhash' as txhash
FROM gov_status_enqueuedvaas,
jsonb_array_elements(gov_status_enqueuedvaas.enqueuedVaas) as vaas
ORDER BY chainid, emitteraddress, sequence, releasetime DESC;`

var result []*EnqueuedVaaItem

err := r.db.Select(ctx, &result, query)
if err != nil {
r.logger.Error("failed to execute query to get enqueued VAAs",
zap.Error(err),
zap.String("query", query))
return nil, err
}

return result, nil
}

func (r *PostgresRepository) IsVaaEnqueued(ctx context.Context, chainID sdk.ChainID, emitterAddr *types.Address, seq string) (bool, error) {
query := `
WITH gov_status_msgs AS (SELECT gov_status_msg.value as status_msg,
(gov_status_msg ->> 'chainid')::smallint as chain_id
FROM wormholescan.wh_governor_status,
jsonb_array_elements(wormholescan.wh_governor_status.message) AS gov_status_msg
WHERE gov_status_msg ->> 'chainid' = $1),
gov_status_enqueuedvaas AS (SELECT chain_id,
emitters -> 'enqueuedvaas' as enqueuedVaas,
emitters ->> 'emitteraddress' as emitter_address
FROM gov_status_msgs,
jsonb_array_elements(gov_status_msgs.status_msg -> 'emitters') as emitters
WHERE emitters ->> 'enqueuedvaas' IS NOT NULL AND emitters ->> 'emitteraddress' = $2)
SELECT chain_id as chainid,
emitter_address as emitteraddress,
(vaas ->> 'sequence')::bigint as sequence,
(vaas ->> 'releasetime')::bigint as releasetime,
(vaas ->> 'notionalvalue')::numeric as notionalvalue,
vaas ->> 'txhash' as txhash
FROM gov_status_enqueuedvaas,
jsonb_array_elements(gov_status_enqueuedvaas.enqueuedVaas) as vaas
WHERE (vaas ->> 'sequence')::bigint = $3;`

var result []*EnqueuedVaaItem
chainIDStr := strconv.Itoa(int(chainID))
addr := utils.DenormalizeHex(emitterAddr.Hex())
err := r.db.Select(ctx, &result, query, chainIDStr, addr, seq)
if err != nil {
r.logger.Error("failed to execute query to get enqueued VAAs",
zap.Error(err),
zap.String("query", query))
return false, err
}

return len(result) > 0, nil
}

func (r *PostgresRepository) GetTokenList(ctx context.Context) ([]*TokenList, error) {
query := `
WITH governor_cfg_tokens AS (SELECT (gov_cfg_tokens ->> 'originchainid')::smallint as originchainid,
gov_cfg_tokens ->> 'originaddress' as originaddress,
(gov_cfg_tokens ->> 'price')::numeric as price
FROM wormholescan.wh_governor_config,
jsonb_array_elements(wormholescan.wh_governor_config.tokens) AS gov_cfg_tokens),
price_counts AS (SELECT originchainid,
originaddress,
price,
COUNT(*) as occurrences
FROM governor_cfg_tokens
GROUP BY originchainid, originaddress, price)
SELECT DISTINCT ON (originchainid, originaddress) originchainid,
originaddress,
price
FROM price_counts
ORDER BY originchainid,
originaddress,
occurrences DESC;
`

var result []*TokenList
err := r.db.Select(ctx, &result, query)
if err != nil {
r.logger.Error("failed to execute query to get token list",
zap.Error(err),
zap.String("query", query))
return nil, err
}

return result, nil
}
33 changes: 26 additions & 7 deletions api/handlers/governor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,29 +290,42 @@ func (s *Service) GetGovernorLimit(ctx context.Context, usePostgres bool, p *pag

// GetAvailNotionByChain get governor limit for each chainID.
// Guardian api migration.
func (s *Service) GetAvailNotionByChain(ctx context.Context) ([]*AvailableNotionalByChain, error) {
func (s *Service) GetAvailNotionByChain(ctx context.Context, usePostgres bool) ([]*AvailableNotionalByChain, error) {
key := availableNotionByChain
return cacheable.GetOrLoad(ctx, s.logger, s.cache, 1*time.Minute, key, s.metrics,
func() ([]*AvailableNotionalByChain, error) {
if usePostgres {
return s.postgresRepo.GetAvailNotionByChain(ctx)
}
return s.mongoRepo.GetAvailNotionByChain(ctx)
})
}

// Get governor token list.
// Guardian api migration.
func (s *Service) GetTokenList(ctx context.Context) ([]*TokenList, error) {
func (s *Service) GetTokenList(ctx context.Context, postgres bool) ([]*TokenList, error) {
key := tokenList
return cacheable.GetOrLoad(ctx, s.logger, s.cache, 1*time.Minute, key, s.metrics,
func() ([]*TokenList, error) {
return s.mongoRepo.GetTokenList(ctx)
if postgres {
return s.postgresRepo.GetTokenList(ctx)
} else {
return s.mongoRepo.GetTokenList(ctx)
}
})

}

// GetEnqueuedVaas get enqueued vaas.
// Guardian api migration.
func (s *Service) GetEnqueuedVaas(ctx context.Context) ([]*EnqueuedVaaItem, error) {
entries, err := s.mongoRepo.GetEnqueuedVaas(ctx)
func (s *Service) GetEnqueuedVaas(ctx context.Context, postgres bool) ([]*EnqueuedVaaItem, error) {
var entries []*EnqueuedVaaItem
var err error
if postgres {
entries, err = s.postgresRepo.GetEnqueuedVaas(ctx)
} else {
entries, err = s.mongoRepo.GetEnqueuedVaas(ctx)
}
if err != nil {
return nil, err
}
Expand All @@ -331,8 +344,14 @@ func (s *Service) GetEnqueuedVaas(ctx context.Context) ([]*EnqueuedVaaItem, erro

// IsVaaEnqueued check vaa is enqueued.
// Guardian api migration.
func (s *Service) IsVaaEnqueued(ctx context.Context, chainID vaa.ChainID, emitter *types.Address, seq string) (bool, error) {
isEnqueued, err := s.mongoRepo.IsVaaEnqueued(ctx, chainID, emitter, seq)
func (s *Service) IsVaaEnqueued(ctx context.Context, chainID vaa.ChainID, emitter *types.Address, seq string, usePostgres bool) (bool, error) {
var isEnqueued bool
var err error
if usePostgres {
isEnqueued, err = s.postgresRepo.IsVaaEnqueued(ctx, chainID, emitter, seq)
} else {
isEnqueued, err = s.mongoRepo.IsVaaEnqueued(ctx, chainID, emitter, seq)
}
return isEnqueued, err
}

Expand Down
47 changes: 31 additions & 16 deletions api/handlers/guardian/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,51 @@ import (
)

type Service struct {
repo *repository.MongoGuardianSetRepository
p2pNetwork string
cache cache.Cache
metrics metrics.Metrics
logger *zap.Logger
mongoGSRepository *repository.MongoGuardianSetRepository
postgresGSRepository *repository.PostgresGuardianSetRepository
p2pNetwork string
cache cache.Cache
metrics metrics.Metrics
logger *zap.Logger
}

const currentGuardianSetKey = "current-guardian-set"

func NewService(repo *repository.MongoGuardianSetRepository, p2pNetwork string, cache cache.Cache,
metrics metrics.Metrics, logger *zap.Logger) *Service {
func NewService(
mongoGSRepo *repository.MongoGuardianSetRepository,
postgresGSRepo *repository.PostgresGuardianSetRepository,
p2pNetwork string,
cache cache.Cache,
metrics metrics.Metrics,
logger *zap.Logger,
) *Service {
return &Service{
repo: repo,
p2pNetwork: p2pNetwork,
cache: cache,
metrics: metrics,
logger: logger.With(zap.String("module", "GuardianService")),
mongoGSRepository: mongoGSRepo,
postgresGSRepository: postgresGSRepo,
p2pNetwork: p2pNetwork,
cache: cache,
metrics: metrics,
logger: logger.With(zap.String("module", "GuardianService")),
}
}

func (s *Service) GetGuardianSet(ctx context.Context) (*GuardianSet, error) {
func (s *Service) GetGuardianSet(ctx context.Context, usePostgres bool) (*GuardianSet, error) {
return cacheable.GetOrLoad(ctx, s.logger, s.cache, 1*time.Minute, currentGuardianSetKey, s.metrics,
func() (*GuardianSet, error) {
return s.getGuardianSet(ctx)
return s.getGuardianSet(ctx, usePostgres)
})
}

func (s *Service) getGuardianSet(ctx context.Context) (*GuardianSet, error) {
func (s *Service) getGuardianSet(ctx context.Context, usePostgres bool) (*GuardianSet, error) {

var docs []*repository.GuardianSet
var err error
if usePostgres {
docs, err = s.postgresGSRepository.FindAll(ctx)
} else {
docs, err = s.mongoGSRepository.FindAll(ctx)
}

docs, err := s.repo.FindAll(ctx)
if err != nil {
s.logger.Error("failed to get guardian set from repository", zap.Error(err))
gs := getByEnv(s.p2pNetwork)
Expand Down
Loading

0 comments on commit 8d27653

Please sign in to comment.