Skip to content

Commit

Permalink
Add E2E test for indexing from the chain
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Aug 29, 2024
1 parent 3eca2e7 commit bbf7ce1
Show file tree
Hide file tree
Showing 17 changed files with 247 additions and 18 deletions.
4 changes: 2 additions & 2 deletions cmd/replication/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func addEnvVars() {
options.DB.ReaderConnectionString = connStr
}

if privKey, hasPrivKey := os.LookupEnv("PRIVATE_KEY"); hasPrivKey {
options.PrivateKeyString = privKey
if privKey, hasPrivKey := os.LookupEnv("SIGNING_KEY"); hasPrivKey {
options.SigningKeyString = privKey
}
}

Expand Down
2 changes: 1 addition & 1 deletion dev/run
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -eu

go run cmd/replication/main.go \
--db.writer-connection-string=$WRITER_CONNECTION_STRING \
--private-key=${NODE_PRIVATE_KEY} \
--signing-key=${NODE_PRIVATE_KEY} \
--contracts.nodes-address=$NODES_CONTRACT_ADDRESS \
--contracts.messages-address=$GROUP_MESSAGES_CONTRACT_ADDRESS \
--contracts.rpc-url=$CHAIN_RPC_URL
File renamed without changes.
60 changes: 60 additions & 0 deletions pkg/blockchain/groupMessagePublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package blockchain

import (
"context"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/config"
"go.uber.org/zap"
)

/*
*
Can publish to the blockchain, signing messages directly with the provided private key
*
*/
type GroupMessagePublisher struct {
signer TransactionSigner
contract *abis.GroupMessages
logger *zap.Logger
}

func NewGroupMessagePublisher(
logger *zap.Logger,
client *ethclient.Client,
signer TransactionSigner,
contractOptions config.ContractsOptions,
) (*GroupMessagePublisher, error) {
contract, err := abis.NewGroupMessages(
common.HexToAddress(contractOptions.MessagesContractAddress),
client,
)

if err != nil {
return nil, err
}

return &GroupMessagePublisher{
signer: signer,
logger: logger.Named("GroupMessagePublisher").
With(zap.String("contractAddress", contractOptions.MessagesContractAddress)),
contract: contract,
}, nil
}

func (g *GroupMessagePublisher) Publish(
ctx context.Context,
groupID [32]byte,
message []byte,
) error {
_, err := g.contract.AddMessage(&bind.TransactOpts{
Context: ctx,
From: g.signer.FromAddress(),
Signer: g.signer.SignerFunc(),
}, groupID, message)

return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
Expand All @@ -27,3 +28,12 @@ type ChainClient interface {
ethereum.LogFilterer
ethereum.ChainIDReader
}

type IGroupMessagePublisher interface {
Publish(ctx context.Context, groupID [32]byte, message []byte) error
}

type TransactionSigner interface {
FromAddress() common.Address
SignerFunc() bind.SignerFn
}
File renamed without changes.
File renamed without changes.
52 changes: 52 additions & 0 deletions pkg/blockchain/signer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package blockchain

import (
"crypto/ecdsa"
"fmt"
"math/big"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)

type PrivateKeySigner struct {
accountAddress common.Address
signFunction bind.SignerFn
}

func NewPrivateKeySigner(privateKeyString string, chainID int) (*PrivateKeySigner, error) {
privateKey, err := crypto.HexToECDSA(strings.TrimPrefix(privateKeyString, "0x"))
if err != nil {
return nil, fmt.Errorf("unable to parse private key: %v", err)
}

publicKey := privateKey.Public()
publicKeyECDSA, ok := publicKey.(*ecdsa.PublicKey)
if !ok {
return nil, fmt.Errorf("Failed to cast to ECDSA public key %v", err)
}

fromAddress := crypto.PubkeyToAddress(*publicKeyECDSA)
auth, err := bind.NewKeyedTransactorWithChainID(
privateKey,
big.NewInt(int64(chainID)),
)
if err != nil {
return nil, fmt.Errorf("Failed to create transactor: %v", err)
}

return &PrivateKeySigner{
accountAddress: fromAddress,
signFunction: auth.Signer,
}, nil
}

func (s *PrivateKeySigner) FromAddress() common.Address {
return s.accountAddress
}

func (s *PrivateKeySigner) SignerFunc() bind.SignerFn {
return s.signFunction
}
8 changes: 7 additions & 1 deletion pkg/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type ContractsOptions struct {
RpcUrl string `long:"rpc-url" description:"Blockchain RPC URL"`
NodesContractAddress string `long:"nodes-address" description:"Node contract address"`
MessagesContractAddress string `long:"messages-address" description:"Message contract address"`
ChainID int `long:"chain-id" description:"Chain ID for the appchain" default:"31337"`
RefreshInterval time.Duration `long:"refresh-interval" description:"Refresh interval for the nodes registry" default:"60s"`
}

Expand All @@ -24,14 +25,19 @@ type DbOptions struct {
WaitForDB time.Duration `long:"wait-for" description:"wait for DB on start, up to specified duration"`
}

type PayerOptions struct {
PrivateKey string `long:"private-key" description:"Private key used to sign blockchain transactions"`
}

type ServerOptions struct {
LogLevel string `short:"l" long:"log-level" description:"Define the logging level, supported strings are: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL, and their lower-case forms." default:"INFO"`
//nolint:staticcheck
LogEncoding string ` long:"log-encoding" description:"Log encoding format. Either console or json" default:"console" choice:"console"`

PrivateKeyString string `long:"private-key" description:"Private key to use for the node"`
SigningKeyString string `long:"signing-key" description:"Signing key used to sign messages"`

API ApiOptions `group:"API Options" namespace:"api"`
DB DbOptions `group:"Database Options" namespace:"db"`
Contracts ContractsOptions `group:"Contracts Options" namespace:"contracts"`
Payer PayerOptions `group:"Payer Options" namespace:"payer"`
}
84 changes: 84 additions & 0 deletions pkg/indexer/e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package indexer

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/indexer/storer"
"github.com/xmtp/xmtpd/pkg/testutils"
)

func startIndexing(t *testing.T) (*queries.Queries, context.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
logger := testutils.NewLog(t)
db, _, cleanup := testutils.NewDB(t, ctx)
cfg := testutils.GetContractsOptions(t)
querier := queries.New(db)

err := StartIndexer(ctx, logger, querier, cfg)
require.NoError(t, err)

return querier, ctx, func() {
cleanup()
cancel()
}
}

func messagePublisher(t *testing.T, ctx context.Context) *blockchain.GroupMessagePublisher {
payerCfg := testutils.GetPayerOptions(t)
contractsCfg := testutils.GetContractsOptions(t)
var signer blockchain.TransactionSigner
signer, err := blockchain.NewPrivateKeySigner(payerCfg.PrivateKey, contractsCfg.ChainID)
require.NoError(t, err)

client, err := blockchain.NewClient(ctx, contractsCfg.RpcUrl)
require.NoError(t, err)

publisher, err := blockchain.NewGroupMessagePublisher(
testutils.NewLog(t),
client,
signer,
contractsCfg,
)
require.NoError(t, err)

return publisher
}

func TestStoreMessages(t *testing.T) {
querier, ctx, cleanup := startIndexing(t)
publisher := messagePublisher(t, ctx)
defer cleanup()

message := testutils.RandomBytes(32)
groupID := testutils.RandomGroupID()
topic := []byte(storer.BuildGroupMessageTopic(groupID))

// Publish the message onto the blockchain
require.NoError(t, publisher.Publish(ctx, groupID, message))

// Poll the DB until the stored message shows up
require.Eventually(t, func() bool {
envelopes, err := querier.SelectGatewayEnvelopes(
context.Background(),
queries.SelectGatewayEnvelopesParams{
Topic: topic,
},
)
require.NoError(t, err)

if len(envelopes) == 0 {
return false
}

firstEnvelope := envelopes[0]
require.Equal(t, firstEnvelope.OriginatorEnvelope, message)
require.Equal(t, firstEnvelope.Topic, topic)

return true
}, 5*time.Second, 100*time.Millisecond, "Failed to find indexed envelope")
}
14 changes: 7 additions & 7 deletions pkg/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/config"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/indexer/blockchain"
"github.com/xmtp/xmtpd/pkg/indexer/storer"
"github.com/xmtp/xmtpd/pkg/utils"
"go.uber.org/zap"
Expand Down Expand Up @@ -40,23 +40,23 @@ func StartIndexer(
[]common.Hash{messagesTopic},
)

streamer, err := builder.Build()
if err != nil {
return err
}

messagesContract, err := messagesContract(cfg, client)
if err != nil {
return err
}

indexLogs(
go indexLogs(
ctx,
messagesChannel,
logger.Named("indexLogs").With(zap.String("contractAddress", cfg.MessagesContractAddress)),
storer.NewGroupMessageStorer(queries, logger, messagesContract),
)

streamer, err := builder.Build()
if err != nil {
return err
}

return streamer.Start(ctx)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/indexer/storer/groupMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS
}

// TODO:nm figure out topic structure
topic := buildTopic(msgSent.GroupId)
topic := BuildGroupMessageTopic(msgSent.GroupId)

s.logger.Debug("Inserting message from contract", zap.String("topic", topic))

Expand All @@ -50,7 +50,7 @@ func (s *GroupMessageStorer) StoreLog(ctx context.Context, event types.Log) LogS
return nil
}

func buildTopic(groupId [32]byte) string {
func BuildGroupMessageTopic(groupId [32]byte) string {
// We should think about simplifying the topics, since backwards compatibility shouldn't really matter here
return fmt.Sprintf("/xmtp/1/g-%x/proto", groupId)
}
5 changes: 2 additions & 3 deletions pkg/indexer/storer/groupMessage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/abis"
"github.com/xmtp/xmtpd/pkg/blockchain"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/indexer/blockchain"
"github.com/xmtp/xmtpd/pkg/testutils"
"github.com/xmtp/xmtpd/pkg/utils"
)
Expand Down Expand Up @@ -43,8 +43,7 @@ func TestStoreGroupMessages(t *testing.T) {
storer, cleanup := buildGroupMessageStorer(t)
defer cleanup()

var groupID [32]byte
copy(groupID[:], testutils.RandomBytes(32))
groupID := testutils.RandomGroupID()
message := testutils.RandomBytes(30)
sequenceID := uint64(1)

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewReplicationServer(
ctx,
queries.New(s.writerDB),
nodeRegistry,
options.PrivateKeyString,
options.SigningKeyString,
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewTestServer(
log := testutils.NewLog(t)

server, err := s.NewReplicationServer(context.Background(), log, config.ServerOptions{
PrivateKeyString: hex.EncodeToString(crypto.FromECDSA(privateKey)),
SigningKeyString: hex.EncodeToString(crypto.FromECDSA(privateKey)),
API: config.ApiOptions{
Port: 0,
},
Expand Down
11 changes: 11 additions & 0 deletions pkg/testutils/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (

const BLOCKCHAIN_RPC_URL = "http://localhost:7545"

// This is the private key that anvil has funded by default
// This is safe to commit
const TEST_PRIVATE_KEY = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"

type contractInfo struct {
DeployedTo string `json:"deployedTo"`
}
Expand Down Expand Up @@ -69,5 +73,12 @@ func GetContractsOptions(t *testing.T) config.ContractsOptions {
MessagesContractAddress: getDeployedTo(t, path.Join(rootDir, "./build/GroupMessages.json")),
NodesContractAddress: getDeployedTo(t, path.Join(rootDir, "./build/Nodes.json")),
RefreshInterval: 100 * time.Millisecond,
ChainID: 31337,
}
}

func GetPayerOptions(t *testing.T) config.PayerOptions {
return config.PayerOptions{
PrivateKey: TEST_PRIVATE_KEY,
}
}
Loading

0 comments on commit bbf7ce1

Please sign in to comment.