Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add nats stream name env variable #4

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .envrc.sample
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ export WORMHOLE_NETWORK_ID=/wormhole/mainnet/2
export WORMHOLE_BOOTSTRAP=/dns4/wormhole-mainnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWQp644DK27fd3d4Km3jr7gHiuJJ5ZGmy8hH4py7fP4FP7,/dns4/wormhole-v2-mainnet-bootstrap.xlabs.xyz/udp/8999/quic/p2p/12D3KooWNQ9tVrcb64tw6bNs2CaNrUGPM7yRrKvBBheQ5yCyPHKC
export WORMHOLE_LISTEN=/ip4/0.0.0.0/udp/30910/quic
export NATS_URL=127.0.0.1:4222
export NATS_STREAM=mainnet-vaas
export SERVER_URL=127.0.0.1:7072
export LOG_LEVEL=info
export WRITER_BATCH_SIZE="1000"
Expand Down
7 changes: 3 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var cli struct {
WormholeBootstrap []string `kong:"required,env='WORMHOLE_BOOTSTRAP',help='Bootstrap nodes to connect to.'"`
WormholeListen []string `kong:"required,env='WORMHOLE_LISTEN',help='Addresses to listen on'"`
ServerURL string `kong:"required,env='SERVER_URL',help='gRPC server URL to bind'"`
NatsStream string `kong:"required,env='NATS_STREAM',help='NATS stream to use'"`
NatsURL string `kong:"required,env='NATS_URL',help='NATS URL to connect'"`
WriterBatchSize int `kong:"required,env='WRITER_BATCH_SIZE',default=100,help='Number of messages to batch'"`
LogLevel string `kong:"required,env='LOG_LEVEL',default=info,help='Log level'"`
Expand All @@ -28,8 +29,6 @@ var cli struct {
HeartbeatInterval int `kong:"required,env='HEARTBEAT_INTERVAL',default='10',help='Maximum time between heartbeats in seconds'"`
}

const STREAM_NAME = "VAAS"

type Heartbeat struct {
// Timestamp is updated from the ReceiveMessages thread
Timestamp int64
Expand Down Expand Up @@ -84,8 +83,8 @@ func main() {

log.Info().Msg("Starting receive/write/serve goroutines")
go ReceiveMessages(channel, heartbeat, cli.WormholeNetworkID, cli.WormholeBootstrap, cli.WormholeListen)
go WriteMessages(channel, cli.NatsURL, cli.WriterBatchSize)
go ServeMessages(cli.ServerURL, cli.NatsURL)
go WriteMessages(channel, cli.NatsURL, cli.NatsStream, cli.WriterBatchSize)
go ServeMessages(cli.ServerURL, cli.NatsURL, cli.NatsStream)

// Interrupt on CTRL-C
done := make(chan os.Signal, 1)
Expand Down
9 changes: 5 additions & 4 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type filterSignedVaa struct {

type server struct {
spyv1.UnimplementedSpyRPCServiceServer
natsConn *nats.Conn
natsConn *nats.Conn
natsStream string
}

func (s server) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, server spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
Expand Down Expand Up @@ -54,7 +55,7 @@ func (s server) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, server
log.Panic().Err(err).Msg("Failed to create stream context")
}

stream, err := js.Stream(ctx, STREAM_NAME)
thmzlt marked this conversation as resolved.
Show resolved Hide resolved
stream, err := js.Stream(ctx, s.natsStream)

if err != nil {
log.Panic().Err(err).Msg("Failed to create stream object")
Expand Down Expand Up @@ -119,7 +120,7 @@ func (s server) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, server
}
}

func ServeMessages(serverURL, natsURL string) {
func ServeMessages(serverURL, natsURL string, natsStream string) {
nc, err := nats.Connect(natsURL)

if err != nil {
Expand All @@ -133,7 +134,7 @@ func ServeMessages(serverURL, natsURL string) {
}

grpcServer := grpc.NewServer()
server := server{natsConn: nc}
server := server{natsConn: nc, natsStream: natsStream}

reflection.Register(grpcServer)
spyv1.RegisterSpyRPCServiceServer(grpcServer, server)
Expand Down
10 changes: 5 additions & 5 deletions write.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)

func WriteMessages(channel chan *vaa.VAA, natsURL string, batchSize int) {
func WriteMessages(channel chan *vaa.VAA, natsURL string, natsStream string, batchSize int) {
nc, err := nats.Connect(natsURL)

if err != nil {
Expand All @@ -24,16 +24,16 @@ func WriteMessages(channel chan *vaa.VAA, natsURL string, batchSize int) {
log.Panic().Err(err).Msg("Failed to create stream context")
}

stream, _ := js.StreamInfo(STREAM_NAME)
stream, _ := js.StreamInfo(natsStream)

if stream == nil {
_, err = js.AddStream(&nats.StreamConfig{
Name: STREAM_NAME,
Name: natsStream,
MaxBytes: 4_000_000_000,
})

if err != nil {
log.Panic().Err(err).Str("stream", STREAM_NAME).Msg("Failed to create stream")
log.Panic().Err(err).Str("stream", natsStream).Msg("Failed to create stream")
}
}

Expand All @@ -57,7 +57,7 @@ func WriteMessages(channel chan *vaa.VAA, natsURL string, batchSize int) {
signingDigest := message.SigningDigest().Hex()

_, err = js.PublishMsgAsync(&nats.Msg{
Subject: STREAM_NAME,
Subject: natsStream,
Header: nats.Header{"Nats-Msg-Id": []string{signingDigest}},
Data: bytes,
})
Expand Down
Loading