Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a crazy amount of logging #353

Merged
merged 3 commits into from
Feb 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (s *Server) startGRPC() error {
wakuMsg := wakuEnv.Message()

if topic.IsMLSV1(wakuMsg.ContentTopic) {
s.Log.Info("handling waku relay message in mlsv1 service", zap.String("topic", wakuMsg.ContentTopic))
if s.mlsv1 != nil {
err := s.mlsv1.HandleIncomingWakuRelayMessage(wakuEnv.Message())
if err != nil {
Expand Down
26 changes: 22 additions & 4 deletions pkg/mls/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (s *Service) Close() {

func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) error {
if topic.IsMLSV1Group(wakuMsg.ContentTopic) {
s.log.Info("received group message from waku relay", zap.String("topic", wakuMsg.ContentTopic))
var msg mlsv1.GroupMessage
err := pb.Unmarshal(wakuMsg.Payload, &msg)
if err != nil {
Expand All @@ -96,11 +97,14 @@ func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) er
if msg.GetV1() == nil {
return nil
}
err = s.nc.Publish(buildNatsSubjectForGroupMessages(msg.GetV1().GroupId), wakuMsg.Payload)
natsSubject := buildNatsSubjectForWelcomeMessages(msg.GetV1().GroupId)
s.log.Info("publishing to nats subject from relay", zap.String("subject", natsSubject))
err = s.nc.Publish(natsSubject, wakuMsg.Payload)
if err != nil {
return err
}
} else if topic.IsMLSV1Welcome(wakuMsg.ContentTopic) {
s.log.Info("received welcome message from waku relay", zap.String("topic", wakuMsg.ContentTopic))
var msg mlsv1.WelcomeMessage
err := pb.Unmarshal(wakuMsg.Payload, &msg)
if err != nil {
Expand All @@ -109,7 +113,9 @@ func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) er
if msg.GetV1() == nil {
return nil
}
err = s.nc.Publish(buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey), wakuMsg.Payload)
natsSubject := buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey)
s.log.Info("publishing to nats subject from relay", zap.String("subject", natsSubject))
err = s.nc.Publish(natsSubject, wakuMsg.Payload)
if err != nil {
return err
}
Expand Down Expand Up @@ -231,6 +237,7 @@ func (s *Service) GetIdentityUpdates(ctx context.Context, req *mlsv1.GetIdentity
}

func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMessagesRequest) (res *emptypb.Empty, err error) {
log := s.log.Named("send-group-messages")
if err = validateSendGroupMessagesRequest(req); err != nil {
return nil, err
}
Expand All @@ -240,21 +247,25 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
// TODO: Separate validation errors from internal errors
return nil, status.Errorf(codes.InvalidArgument, "invalid group message: %s", err)
}
log.Info("validated group messages", zap.Int("count", len(validationResults)))

for i, result := range validationResults {
input := req.Messages[i]

if err = requireReadyToSend(result.GroupId, input.GetV1().Data); err != nil {
log.Warn("invalid group message", zap.Error(err))
return nil, err
}

// TODO: Wrap this in a transaction so publishing is all or nothing
decodedGroupId, err := hex.DecodeString(result.GroupId)
if err != nil {
log.Warn("invalid group id", zap.Error(err))
return nil, status.Error(codes.InvalidArgument, "invalid group id")
}
msg, err := s.store.InsertGroupMessage(ctx, decodedGroupId, input.GetV1().Data)
if err != nil {
log.Warn("error inserting message", zap.Error(err))
if mlsstore.IsAlreadyExistsError(err) {
continue
}
Expand All @@ -272,18 +283,22 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
},
})
if err != nil {
log.Error("error serializing message", zap.Error(err))
return nil, err
}

contentTopic := topic.BuildMLSV1GroupTopic(decodedGroupId)

err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1GroupTopic(decodedGroupId),
ContentTopic: contentTopic,
Timestamp: msg.CreatedAt.UnixNano(),
Payload: msgB,
})

if err != nil {
log.Error("error publishing to waku message", zap.Error(err), zap.String("contentTopic", contentTopic))
return nil, err
}
log.Info("published to waku relay", zap.String("contentTopic", contentTopic))
}

return &emptypb.Empty{}, nil
Expand Down Expand Up @@ -342,13 +357,15 @@ func (s *Service) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelc
func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesRequest, stream mlsv1.MlsApi_SubscribeGroupMessagesServer) error {
log := s.log.Named("subscribe-group-messages").With(zap.Int("filters", len(req.Filters)))
log.Info("subscription started")
defer log.Info("subscription ended")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))

var streamLock sync.Mutex
for _, filter := range req.Filters {
natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId)
log.Info("subscribing to nats subject", zap.String("subject", natsSubject), zap.String("group_id", hex.EncodeToString(filter.GroupId)))
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.GroupMessage
Expand Down Expand Up @@ -386,6 +403,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques
func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRequest, stream mlsv1.MlsApi_SubscribeWelcomeMessagesServer) error {
log := s.log.Named("subscribe-welcome-messages").With(zap.Int("filters", len(req.Filters)))
log.Info("subscription started")
defer log.Info("subscription ended")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))
Expand Down
Loading