Skip to content

Commit

Permalink
Implement query endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Aug 30, 2024
1 parent ac08512 commit ffa6eca
Show file tree
Hide file tree
Showing 15 changed files with 417 additions and 86 deletions.
10 changes: 7 additions & 3 deletions pkg/api/publishWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ func (p *PublishWorker) publishStagedEnvelope(stagedEnv queries.StagedOriginator
inserted, err := q.InsertGatewayEnvelope(
p.ctx,
queries.InsertGatewayEnvelopeParams{
OriginatorID: int32(p.registrant.NodeID()),
OriginatorNodeID: int32(p.registrant.NodeID()),
OriginatorSequenceID: stagedEnv.ID,
Topic: stagedEnv.Topic,
OriginatorEnvelope: originatorBytes,
},
)
if err != nil {
if p.ctx.Err() != nil {
return false
} else if err != nil {
logger.Error("Failed to insert gateway envelope", zap.Error(err))
return false
} else if inserted == 0 {
Expand All @@ -134,7 +136,9 @@ func (p *PublishWorker) publishStagedEnvelope(stagedEnv queries.StagedOriginator

// Try to delete the row regardless of if the gateway envelope was inserted elsewhere
deleted, err := q.DeleteStagedOriginatorEnvelope(context.Background(), stagedEnv.ID)
if err != nil {
if p.ctx.Err() != nil {
return true
} else if err != nil {
logger.Error("Failed to delete staged envelope", zap.Error(err))
// Envelope is already inserted, so it is safe to continue
return true
Expand Down
85 changes: 84 additions & 1 deletion pkg/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@ import (
"context"
"database/sql"

"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
"github.com/xmtp/xmtpd/pkg/registrant"
"github.com/xmtp/xmtpd/pkg/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"go.uber.org/zap"
)

const (
maxRequestedRows int32 = 1000
)

type Service struct {
message_api.UnimplementedReplicationApiServer

Expand Down Expand Up @@ -58,7 +64,84 @@ func (s *Service) QueryEnvelopes(
ctx context.Context,
req *message_api.QueryEnvelopesRequest,
) (*message_api.QueryEnvelopesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method QueryEnvelopes not implemented")
params, err := s.queryReqToDBParams(req)
if err != nil {
return nil, err
}

rows, err := queries.New(s.store).SelectGatewayEnvelopes(ctx, *params)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not select envelopes: %v", err)
}

envs := make([]*message_api.GatewayEnvelope, 0, len(rows))
for _, row := range rows {
originatorEnv := &message_api.OriginatorEnvelope{}
err := proto.Unmarshal(row.OriginatorEnvelope, originatorEnv)
if err != nil {
// We expect to have already validated the envelope when it was inserted
s.log.Error("could not unmarshal originator envelope", zap.Error(err))
continue
}
gatewaySID, err := s.registrant.SID(row.ID)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not generate SID for envelope")
}
envs = append(envs, &message_api.GatewayEnvelope{
GatewaySid: gatewaySID,
OriginatorEnvelope: originatorEnv,
})
}

return &message_api.QueryEnvelopesResponse{
Envelopes: envs,
}, nil
}

func (s *Service) queryReqToDBParams(
req *message_api.QueryEnvelopesRequest,
) (*queries.SelectGatewayEnvelopesParams, error) {
params := queries.SelectGatewayEnvelopesParams{
Topic: []byte{},
OriginatorNodeID: sql.NullInt32{},
OriginatorSequenceID: sql.NullInt64{},
GatewaySequenceID: sql.NullInt64{},
RowLimit: db.NullInt32(maxRequestedRows),
}

query := req.GetQuery()
if query == nil {
return nil, status.Errorf(codes.InvalidArgument, "missing query")
}

switch filter := query.GetFilter().(type) {
case *message_api.EnvelopesQuery_Topic:
params.Topic = filter.Topic
case *message_api.EnvelopesQuery_OriginatorId:
params.OriginatorNodeID = db.NullInt32(int32(filter.OriginatorId))
default:
}

switch lastSeen := query.GetLastSeen().(type) {
case *message_api.EnvelopesQuery_GatewaySid:
if utils.NodeID(lastSeen.GatewaySid) != s.registrant.NodeID() {
return nil, status.Errorf(codes.InvalidArgument, "wrong gateway SID")
}
params.GatewaySequenceID = db.NullInt64(utils.SequenceID(lastSeen.GatewaySid))
case *message_api.EnvelopesQuery_OriginatorSid:
// TODO(rich): Properly handle clients switching between nodes. This filters on the
// node ID which is not what we want.
params.OriginatorNodeID = db.NullInt32(int32(utils.NodeID(lastSeen.OriginatorSid)))
params.OriginatorSequenceID = db.NullInt64(utils.SequenceID(lastSeen.OriginatorSid))
default:
}

limit := int32(req.GetLimit())
if limit > 0 && limit <= maxRequestedRows {
params.RowLimit = db.NullInt32(limit)
}

return &params, nil
}

func (s *Service) PublishEnvelope(
Expand Down
Loading

0 comments on commit ffa6eca

Please sign in to comment.