diff --git a/pkg/replication/api/server.go b/pkg/replication/api/server.go index 1fa871ce..fc2969ac 100644 --- a/pkg/replication/api/server.go +++ b/pkg/replication/api/server.go @@ -29,6 +29,7 @@ type ApiServer struct { func NewAPIServer(ctx context.Context, log *zap.Logger, port int) (*ApiServer, error) { grpcListener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) + if err != nil { return nil, err } @@ -74,6 +75,10 @@ func NewAPIServer(ctx context.Context, log *zap.Logger, port int) (*ApiServer, e return s, nil } +func (s *ApiServer) Addr() net.Addr { + return s.grpcListener.Addr() +} + func isErrUseOfClosedConnection(err error) bool { return strings.Contains(err.Error(), "use of closed network connection") } diff --git a/pkg/replication/options.go b/pkg/replication/options.go index 6667a251..95f536c3 100644 --- a/pkg/replication/options.go +++ b/pkg/replication/options.go @@ -14,6 +14,8 @@ type Options struct { //nolint:staticcheck LogEncoding string `long:"log-encoding" description:"Log encoding format. Either console or json" choice:"console" choice:"json" default:"console"` + PrivateKeyString string `long:"private-key" description:"Private key to use for the node"` + API ApiOptions `group:"API Options" namespace:"api"` DB DbOptions `group:"Database Options" namespace:"db"` } diff --git a/pkg/replication/registry/registry.go b/pkg/replication/registry/registry.go new file mode 100644 index 00000000..56a33cbc --- /dev/null +++ b/pkg/replication/registry/registry.go @@ -0,0 +1,32 @@ +package registry + +type Node struct { + Index int + PublicKey []byte + GrpcAddress string + DisabledBlock *uint64 + // Maybe add mTLS cert here +} + +type NodeRegistry interface { + GetNodes() ([]Node, error) + // OnChange() +} + +// TODO: Delete this or move to a test file + +type FixedNodeRegistry struct { + nodes []Node +} + +func NewFixedNodeRegistry(nodes []Node) *FixedNodeRegistry { + return &FixedNodeRegistry{nodes: nodes} +} + +func (r *FixedNodeRegistry) GetNodes() ([]Node, error) { + return r.nodes, nil +} + +func (f *FixedNodeRegistry) AddNode(node Node) { + f.nodes = append(f.nodes, node) +} diff --git a/pkg/replication/server.go b/pkg/replication/server.go index 2520b9b4..bcd0952e 100644 --- a/pkg/replication/server.go +++ b/pkg/replication/server.go @@ -2,28 +2,38 @@ package replication import ( "context" + "crypto/ecdsa" + "net" "os" "os/signal" "syscall" + "github.com/ethereum/go-ethereum/crypto" "github.com/xmtp/xmtp-node-go/pkg/replication/api" + "github.com/xmtp/xmtp-node-go/pkg/replication/registry" "go.uber.org/zap" ) type Server struct { - options Options - log *zap.Logger - ctx context.Context - cancel context.CancelFunc - apiServer *api.ApiServer + options Options + log *zap.Logger + ctx context.Context + cancel context.CancelFunc + apiServer *api.ApiServer + nodeRegistry registry.NodeRegistry + privateKey *ecdsa.PrivateKey } -func New(ctx context.Context, log *zap.Logger, options Options) (*Server, error) { +func New(ctx context.Context, log *zap.Logger, options Options, nodeRegistry registry.NodeRegistry) (*Server, error) { var err error - s := &Server{ - options: options, - log: log, + options: options, + log: log, + nodeRegistry: nodeRegistry, + } + s.privateKey, err = parsePrivateKey(options.PrivateKeyString) + if err != nil { + return nil, err } s.ctx, s.cancel = context.WithCancel(ctx) @@ -35,6 +45,10 @@ func New(ctx context.Context, log *zap.Logger, options Options) (*Server, error) return s, nil } +func (s *Server) Addr() net.Addr { + return s.apiServer.Addr() +} + func (s *Server) WaitForShutdown() { termChannel := make(chan os.Signal, 1) signal.Notify(termChannel, syscall.SIGINT, syscall.SIGTERM) @@ -44,5 +58,8 @@ func (s *Server) WaitForShutdown() { func (s *Server) Shutdown() { s.cancel() +} +func parsePrivateKey(privateKeyString string) (*ecdsa.PrivateKey, error) { + return crypto.HexToECDSA(privateKeyString) } diff --git a/pkg/replication/server_test.go b/pkg/replication/server_test.go new file mode 100644 index 00000000..82c92823 --- /dev/null +++ b/pkg/replication/server_test.go @@ -0,0 +1,35 @@ +package replication + +import ( + "context" + "encoding/hex" + "testing" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/require" + "github.com/xmtp/xmtp-node-go/pkg/replication/registry" + test "github.com/xmtp/xmtp-node-go/pkg/testing" +) + +func NewTestServer(t *testing.T, registry registry.NodeRegistry) *Server { + log := test.NewLog(t) + privateKey, err := crypto.GenerateKey() + require.NoError(t, err) + + server, err := New(context.Background(), log, Options{ + PrivateKeyString: hex.EncodeToString(crypto.FromECDSA(privateKey)), + API: ApiOptions{ + Port: 0, + }, + }, registry) + require.NoError(t, err) + + return server +} + +func TestCreateServer(t *testing.T) { + registry := registry.NewFixedNodeRegistry([]registry.Node{}) + server1 := NewTestServer(t, registry) + server2 := NewTestServer(t, registry) + require.NotEqual(t, server1.Addr(), server2.Addr()) +}