Skip to content

Commit

Permalink
add publish worker
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Aug 23, 2024
1 parent 6ce44be commit 16b2940
Show file tree
Hide file tree
Showing 11 changed files with 619 additions and 312 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,5 @@ We use [sqlc](https://docs.sqlc.dev/en/latest/index.html) to generate the code f
```sh
dev/generate
```

If needed, there is a sqlc [playground](https://play.sqlc.dev/p/f6eebe941750560934cefa943c77f63497debc828c487e8d1771fb6d83773246) for experimenting with how the query syntax translates into Go code.
107 changes: 107 additions & 0 deletions pkg/api/publishWorker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package api

import (
"context"
"database/sql"
"time"

"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/registrant"
"google.golang.org/protobuf/proto"
)

type PublishWorker struct {
ctx context.Context
listener <-chan []queries.StagedOriginatorEnvelope
registrant *registrant.Registrant
store *sql.DB
subscription db.DBSubscription[queries.StagedOriginatorEnvelope]
}

func StartPublishWorker(
ctx context.Context,
reg *registrant.Registrant,
store *sql.DB,
notifier <-chan bool,
) (*PublishWorker, error) {
query := func(lastSeenID int64, numRows int32) ([]queries.StagedOriginatorEnvelope, int64, error) {
results, err := queries.New(store).SelectStagedOriginatorEnvelopes(
ctx,
queries.SelectStagedOriginatorEnvelopesParams{
LastSeenID: lastSeenID,
NumRows: numRows,
},
)
if err != nil {
return nil, 0, err
}
if len(results) > 0 {
lastSeenID = results[len(results)-1].ID
}
return results, lastSeenID, nil
}
subscription := db.NewDBSubscription(
ctx,
query,
0, // lastSeenID
db.PollingOptions{Interval: time.Second, Notifier: notifier, NumRows: 100},
)
listener, err := subscription.Start()
if err != nil {
return nil, err
}

worker := &PublishWorker{
ctx: ctx,
subscription: *subscription,
listener: listener,
registrant: reg,
store: store,
}
go worker.start()

return worker, nil
}

func (p *PublishWorker) start() {
for {
select {
case <-p.ctx.Done():
return
case new_batch := <-p.listener:
for _, stagedEnv := range new_batch {
originatedEnv, err := p.registrant.SignStagedEnvelope(stagedEnv)
if err != nil {
panic("TODO(rich)")
}
originatedBytes, err := proto.Marshal(originatedEnv)
if err != nil {
panic("TODO(rich)")
}

q := queries.New(p.store)

// On unique constraint conflicts, no error is thrown, but numRows is 0
_, err = q.InsertGatewayEnvelope(
p.ctx,
queries.InsertGatewayEnvelopeParams{
OriginatorID: int32(p.registrant.NodeID()),
SequenceID: stagedEnv.ID,
Topic: stagedEnv.Topic,
OriginatorEnvelope: originatedBytes,
},
)
if err != nil {
panic("TODO(rich)")
}

// Ensure the row is deleted, even if a unique constraint conflict occurred above
_, err = q.DeleteStagedOriginatorEnvelope(context.Background(), stagedEnv.ID)
if err != nil {
panic("TODO(rich)")
}
}
}
}
}
97 changes: 80 additions & 17 deletions pkg/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,33 @@ import (
type Service struct {
message_api.UnimplementedReplicationApiServer

ctx context.Context
log *zap.Logger
registrant *registrant.Registrant
queries *queries.Queries
ctx context.Context
log *zap.Logger
notifyStagedPublish chan<- bool
registrant *registrant.Registrant
store *sql.DB
worker *PublishWorker
}

func NewReplicationApiService(
ctx context.Context,
log *zap.Logger,
registrant *registrant.Registrant,
writerDB *sql.DB,
store *sql.DB,
) (*Service, error) {
return &Service{ctx: ctx, log: log, registrant: registrant, queries: queries.New(writerDB)}, nil
notifier := make(chan bool, 1)
worker, err := StartPublishWorker(ctx, registrant, store, notifier)
if err != nil {
return nil, err
}
return &Service{
ctx: ctx,
log: log,
notifyStagedPublish: notifier,
registrant: registrant,
store: store,
worker: worker,
}, nil
}

func (s *Service) Close() {
Expand All @@ -54,32 +68,81 @@ func (s *Service) PublishEnvelope(
ctx context.Context,
req *message_api.PublishEnvelopeRequest,
) (*message_api.PublishEnvelopeResponse, error) {
payerEnv := req.GetPayerEnvelope()
clientBytes := payerEnv.GetUnsignedClientEnvelope()
sig := payerEnv.GetPayerSignature()
if (clientBytes == nil) || (sig == nil) {
return nil, status.Errorf(codes.InvalidArgument, "missing envelope or signature")
clientEnv, err := s.validatePayerInfo(req.GetPayerEnvelope())
if err != nil {
return nil, err
}
// TODO(rich): Verify payer signature
// TODO(rich): Verify all originators have synced past `last_originator_sids`
// TODO(rich): Check that the blockchain sequence ID is equal to the latest on the group
// TODO(rich): Perform any payload-specific validation (e.g. identity updates)

topic, err := s.validateClientInfo(clientEnv)
if err != nil {
return nil, err
}

// TODO(rich): If it is a commit, publish it to blockchain instead

payerBytes, err := proto.Marshal(payerEnv)
payerBytes, err := proto.Marshal(req.GetPayerEnvelope())
if err != nil {
return nil, status.Errorf(codes.Internal, "could not marshal envelope: %v", err)
}

stagedEnv, err := s.queries.InsertStagedOriginatorEnvelope(ctx, payerBytes)
stagedEnv, err := queries.New(s.store).
InsertStagedOriginatorEnvelope(ctx, queries.InsertStagedOriginatorEnvelopeParams{
Topic: topic,
PayerEnvelope: payerBytes,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "could not insert staged envelope: %v", err)
}

select {
case s.notifyStagedPublish <- true:
default:
}

originatorEnv, err := s.registrant.SignStagedEnvelope(stagedEnv)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not sign envelope: %v", err)
}

return &message_api.PublishEnvelopeResponse{OriginatorEnvelope: originatorEnv}, nil
}

func (s *Service) validatePayerInfo(
payerEnv *message_api.PayerEnvelope,
) (*message_api.ClientEnvelope, error) {
clientBytes := payerEnv.GetUnsignedClientEnvelope()
sig := payerEnv.GetPayerSignature()
if (clientBytes == nil) || (sig == nil) {
return nil, status.Errorf(codes.InvalidArgument, "missing envelope or signature")
}
// TODO(rich): Verify payer signature

clientEnv := &message_api.ClientEnvelope{}
err := proto.Unmarshal(clientBytes, clientEnv)
if err != nil {
return nil, status.Errorf(
codes.InvalidArgument,
"could not unmarshal client envelope: %v",
err,
)
}

return clientEnv, nil
}

func (s *Service) validateClientInfo(clientEnv *message_api.ClientEnvelope) ([]byte, error) {
if clientEnv.GetAad().GetTargetOriginator() != uint32(s.registrant.NodeID()) {
return nil, status.Errorf(codes.InvalidArgument, "invalid target originator")
}

topic := clientEnv.GetAad().GetTargetTopic()
if len(topic) == 0 {
return nil, status.Errorf(codes.InvalidArgument, "missing target topic")
}

// TODO(rich): Verify all originators have synced past `last_originator_sids`
// TODO(rich): Check that the blockchain sequence ID is equal to the latest on the group
// TODO(rich): Perform any payload-specific validation (e.g. identity updates)

return topic, nil
}
41 changes: 38 additions & 3 deletions pkg/api/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -40,15 +41,29 @@ func newTestService(t *testing.T) (*Service, *sql.DB, func()) {
}
}

func createClientEnvelopeBytes(t *testing.T) []byte {
clientEnv := &message_api.ClientEnvelope{
Payload: nil,
Aad: &message_api.AuthenticatedData{
TargetOriginator: 1,
TargetTopic: []byte{0x5},
LastOriginatorSids: []uint64{},
},
}
clientEnvBytes, err := proto.Marshal(clientEnv)
require.NoError(t, err)
return clientEnvBytes
}

func TestSimplePublish(t *testing.T) {
svc, _, cleanup := newTestService(t)
svc, db, cleanup := newTestService(t)
defer cleanup()

resp, err := svc.PublishEnvelope(
context.Background(),
&message_api.PublishEnvelopeRequest{
PayerEnvelope: &message_api.PayerEnvelope{
UnsignedClientEnvelope: []byte{0x5},
UnsignedClientEnvelope: createClientEnvelopeBytes(t),
PayerSignature: &associations.RecoverableEcdsaSignature{},
},
},
Expand All @@ -61,7 +76,27 @@ func TestSimplePublish(t *testing.T) {
t,
proto.Unmarshal(resp.GetOriginatorEnvelope().GetUnsignedOriginatorEnvelope(), unsignedEnv),
)
require.Equal(t, uint8(0x5), unsignedEnv.GetPayerEnvelope().GetUnsignedClientEnvelope()[0])
clientEnv := &message_api.ClientEnvelope{}
require.NoError(
t,
proto.Unmarshal(unsignedEnv.GetPayerEnvelope().GetUnsignedClientEnvelope(), clientEnv),
)
require.Equal(t, uint8(0x5), clientEnv.Aad.GetTargetTopic()[0])

// Check that the envelope was published to the database after a delay
require.Eventually(t, func() bool {
envs, err := queries.New(db).
SelectGatewayEnvelopes(context.Background(), queries.SelectGatewayEnvelopesParams{})
require.NoError(t, err)

if len(envs) != 1 {
return false
}

originatorEnv := &message_api.OriginatorEnvelope{}
require.NoError(t, proto.Unmarshal(envs[0].OriginatorEnvelope, originatorEnv))
return proto.Equal(originatorEnv, resp.GetOriginatorEnvelope())
}, 500*time.Millisecond, 50*time.Millisecond)

// TODO(rich) Test that the published envelope is retrievable via the query API
}
17 changes: 16 additions & 1 deletion pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,26 @@ WHERE
SELECT
insert_gateway_envelope(@originator_id, @sequence_id, @topic, @originator_envelope);

-- name: SelectGatewayEnvelopes :many
SELECT
*
FROM
gateway_envelopes
WHERE (sqlc.narg('topic')::BYTEA IS NULL
OR topic = @topic)
AND (sqlc.narg('originator_node_id')::INT IS NULL
OR originator_node_id = @originator_node_id)
AND (sqlc.narg('originator_sequence_id')::BIGINT IS NULL
OR originator_sequence_id > @originator_sequence_id)
AND (sqlc.narg('gateway_sequence_id')::BIGINT IS NULL
OR id > @gateway_sequence_id)
LIMIT sqlc.narg('row_limit')::INT;

-- name: InsertStagedOriginatorEnvelope :one
SELECT
*
FROM
insert_staged_originator_envelope(@payer_envelope);
insert_staged_originator_envelope(@topic, @payer_envelope);

-- name: SelectStagedOriginatorEnvelopes :many
SELECT
Expand Down
11 changes: 6 additions & 5 deletions pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 16b2940

Please sign in to comment.