Skip to content

Commit

Permalink
Validate node ID, sequence ID and timestamp on synced payloads (#311)
Browse files Browse the repository at this point in the history
This adds initial validation on synced payloads.

Still to be done:

- Misbehavior reporting system
- Payload-specific validation

#302 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Summary by CodeRabbit

- **New Features**
	- Enhanced query logic to ensure unique results for originator nodes.
- Added a method to access `OriginatorNs` value in the
`OriginatorEnvelope` struct.
- Improved stream management and envelope processing in the sync worker.

- **Bug Fixes**
- Refined error handling for stream connections and envelope validation.

- **Refactor**
- Restructured stream and envelope handling to improve data
encapsulation and processing logic.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
richardhuaaa authored Dec 19, 2024
1 parent 1911fb9 commit f223c36
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 46 deletions.
10 changes: 6 additions & 4 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ DELETE FROM staged_originator_envelopes
WHERE id = @id;

-- name: SelectVectorClock :many
SELECT
SELECT DISTINCT ON (originator_node_id)
originator_node_id,
max(originator_sequence_id)::BIGINT AS originator_sequence_id
originator_sequence_id,
originator_envelope
FROM
gateway_envelopes
GROUP BY
originator_node_id;
ORDER BY
originator_node_id,
originator_sequence_id DESC;

-- name: GetAddressLogs :many
SELECT
Expand Down
13 changes: 8 additions & 5 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/envelopes/originator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (o *OriginatorEnvelope) OriginatorSequenceID() uint64 {
return o.UnsignedOriginatorEnvelope.OriginatorSequenceID()
}

func (o *OriginatorEnvelope) OriginatorNs() int64 {
return o.UnsignedOriginatorEnvelope.OriginatorNs()
}

func (o *OriginatorEnvelope) TargetTopic() topic.Topic {
return o.UnsignedOriginatorEnvelope.TargetTopic()
}
98 changes: 61 additions & 37 deletions pkg/sync/syncWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
envUtils "github.com/xmtp/xmtpd/pkg/envelopes"
clientInterceptors "github.com/xmtp/xmtpd/pkg/interceptors/client"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
Expand All @@ -18,7 +19,6 @@ import (
"github.com/xmtp/xmtpd/pkg/tracing"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)

type syncWorker struct {
Expand All @@ -33,6 +33,12 @@ type syncWorker struct {
cancel context.CancelFunc
}

type originatorStream struct {
nodeID uint32
lastEnvelope *envUtils.OriginatorEnvelope
stream message_api.ReplicationApi_SubscribeEnvelopesClient
}

type ExitLoopError struct {
Message string
}
Expand Down Expand Up @@ -167,7 +173,7 @@ func (s *syncWorker) subscribeToNodeRegistration(
}

var conn *grpc.ClientConn
var stream message_api.ReplicationApi_SubscribeEnvelopesClient
var stream *originatorStream
err = nil

// TODO(mkysel) we should eventually implement a better backoff strategy
Expand Down Expand Up @@ -270,7 +276,7 @@ func (s *syncWorker) setupStream(
ctx context.Context,
node registry.Node,
conn *grpc.ClientConn,
) (message_api.ReplicationApi_SubscribeEnvelopesClient, error) {
) (*originatorStream, error) {
result, err := queries.New(s.store).SelectVectorClock(ctx)
if err != nil {
return nil, err
Expand All @@ -282,11 +288,12 @@ func (s *syncWorker) setupStream(
zap.Any("vc", vc),
)
client := message_api.NewReplicationApiClient(conn)
nodeID := node.NodeID
stream, err := client.SubscribeEnvelopes(
ctx,
&message_api.SubscribeEnvelopesRequest{
Query: &message_api.EnvelopesQuery{
OriginatorNodeIds: []uint32{node.NodeID},
OriginatorNodeIds: []uint32{nodeID},
LastSeen: &envelopes.VectorClock{
NodeIdToSequenceId: vc,
},
Expand All @@ -299,15 +306,25 @@ func (s *syncWorker) setupStream(
err,
)
}
return stream, nil
originatorStream := &originatorStream{nodeID: nodeID, stream: stream}
for _, row := range result {
if uint32(row.OriginatorNodeID) == nodeID {
lastEnvelope, err := envUtils.NewOriginatorEnvelopeFromBytes(row.OriginatorEnvelope)
if err != nil {
return nil, err
}
originatorStream.lastEnvelope = lastEnvelope
}
}
return originatorStream, nil
}

func (s *syncWorker) listenToStream(
stream message_api.ReplicationApi_SubscribeEnvelopesClient,
originatorStream *originatorStream,
) error {
for {
// Recv() is a blocking operation that can only be interrupted by cancelling ctx
envs, err := stream.Recv()
envs, err := originatorStream.stream.Recv()
if err == io.EOF {
return fmt.Errorf("Stream closed with EOF")
}
Expand All @@ -318,55 +335,62 @@ func (s *syncWorker) listenToStream(
}
s.log.Debug("Received envelopes", zap.Any("numEnvelopes", len(envs.Envelopes)))
for _, env := range envs.Envelopes {
s.insertEnvelope(env)
s.validateAndInsertEnvelope(originatorStream, env)
}
}

}

func (s *syncWorker) insertEnvelope(env *envelopes.OriginatorEnvelope) {
s.log.Debug("Replication server received envelope", zap.Any("envelope", env))
// TODO(nm) Validation logic - share code with API service and publish worker
originatorBytes, err := proto.Marshal(env)
func (s *syncWorker) validateAndInsertEnvelope(
stream *originatorStream,
envProto *envelopes.OriginatorEnvelope,
) {
env, err := envUtils.NewOriginatorEnvelope(envProto)
if err != nil {
s.log.Error("Failed to marshal originator envelope", zap.Error(err))
s.log.Error("Failed to unmarshal originator envelope", zap.Error(err))
return
}

unsignedEnvelope := &envelopes.UnsignedOriginatorEnvelope{}
err = proto.Unmarshal(env.GetUnsignedOriginatorEnvelope(), unsignedEnvelope)
if err != nil {
s.log.Error(
"Failed to unmarshal unsigned originator envelope",
zap.Error(err),
)
if env.OriginatorNodeID() != stream.nodeID {
s.log.Error("Received envelope from wrong node", zap.Any("nodeID", env.OriginatorNodeID()))
return
}

clientEnvelope := &envelopes.ClientEnvelope{}
err = proto.Unmarshal(
unsignedEnvelope.GetPayerEnvelope().GetUnsignedClientEnvelope(),
clientEnvelope,
)
var lastSequenceID uint64 = 0
var lastNs int64 = 0
if stream.lastEnvelope != nil {
lastSequenceID = stream.lastEnvelope.OriginatorSequenceID()
lastNs = stream.lastEnvelope.OriginatorNs()
}
if env.OriginatorSequenceID() != lastSequenceID+1 || env.OriginatorNs() < lastNs {
// TODO(rich) Submit misbehavior report and continue
s.log.Error("Received out of order envelope")
}

if env.OriginatorSequenceID() > lastSequenceID {
stream.lastEnvelope = env
}

// TODO Validation logic - share code with API service and publish worker
// Signatures, topic type, etc
s.insertEnvelope(env)
}

func (s *syncWorker) insertEnvelope(env *envUtils.OriginatorEnvelope) {
s.log.Debug("Replication server received envelope", zap.Any("envelope", env))
originatorBytes, err := env.Bytes()
if err != nil {
s.log.Error(
"Failed to unmarshal client envelope",
zap.Error(err),
)
s.log.Error("Failed to marshal originator envelope", zap.Error(err))
return
}

q := queries.New(s.store)

inserted, err := q.InsertGatewayEnvelope(
s.ctx,
queries.InsertGatewayEnvelopeParams{
OriginatorNodeID: int32(unsignedEnvelope.GetOriginatorNodeId()),
OriginatorSequenceID: int64(
unsignedEnvelope.GetOriginatorSequenceId(),
),
Topic: clientEnvelope.GetAad().GetTargetTopic(),
OriginatorEnvelope: originatorBytes,
OriginatorNodeID: int32(env.OriginatorNodeID()),
OriginatorSequenceID: int64(env.OriginatorSequenceID()),
Topic: env.TargetTopic().Bytes(),
OriginatorEnvelope: originatorBytes,
},
)
if err != nil {
Expand Down

0 comments on commit f223c36

Please sign in to comment.