diff --git a/pkg/api/server.go b/pkg/api/server.go index 267804a2..e4aaf3fe 100644 --- a/pkg/api/server.go +++ b/pkg/api/server.go @@ -165,6 +165,7 @@ func (s *Server) startGRPC() error { wakuMsg := wakuEnv.Message() if topic.IsMLSV1(wakuMsg.ContentTopic) { + s.Log.Info("handling waku relay message in mlsv1 service", zap.String("topic", wakuMsg.ContentTopic)) if s.mlsv1 != nil { err := s.mlsv1.HandleIncomingWakuRelayMessage(wakuEnv.Message()) if err != nil { diff --git a/pkg/mls/api/v1/service.go b/pkg/mls/api/v1/service.go index de6cd901..e0648371 100644 --- a/pkg/mls/api/v1/service.go +++ b/pkg/mls/api/v1/service.go @@ -88,6 +88,7 @@ func (s *Service) Close() { func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) error { if topic.IsMLSV1Group(wakuMsg.ContentTopic) { + s.log.Info("received group message from waku relay", zap.String("topic", wakuMsg.ContentTopic)) var msg mlsv1.GroupMessage err := pb.Unmarshal(wakuMsg.Payload, &msg) if err != nil { @@ -96,11 +97,14 @@ func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) er if msg.GetV1() == nil { return nil } - err = s.nc.Publish(buildNatsSubjectForGroupMessages(msg.GetV1().GroupId), wakuMsg.Payload) + natsSubject := buildNatsSubjectForGroupMessages(msg.GetV1().GroupId) + s.log.Info("publishing to nats subject from relay", zap.String("subject", natsSubject)) + err = s.nc.Publish(natsSubject, wakuMsg.Payload) if err != nil { return err } } else if topic.IsMLSV1Welcome(wakuMsg.ContentTopic) { + s.log.Info("received welcome message from waku relay", zap.String("topic", wakuMsg.ContentTopic)) var msg mlsv1.WelcomeMessage err := pb.Unmarshal(wakuMsg.Payload, &msg) if err != nil { @@ -109,7 +113,9 @@ func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) er if msg.GetV1() == nil { return nil } - err = s.nc.Publish(buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey), wakuMsg.Payload) + natsSubject := buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey) + s.log.Info("publishing to nats subject from relay", zap.String("subject", natsSubject)) + err = s.nc.Publish(natsSubject, wakuMsg.Payload) if err != nil { return err } @@ -231,6 +237,7 @@ func (s *Service) GetIdentityUpdates(ctx context.Context, req *mlsv1.GetIdentity } func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMessagesRequest) (res *emptypb.Empty, err error) { + log := s.log.Named("send-group-messages") if err = validateSendGroupMessagesRequest(req); err != nil { return nil, err } @@ -240,21 +247,25 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes // TODO: Separate validation errors from internal errors return nil, status.Errorf(codes.InvalidArgument, "invalid group message: %s", err) } + log.Info("validated group messages", zap.Int("count", len(validationResults))) for i, result := range validationResults { input := req.Messages[i] if err = requireReadyToSend(result.GroupId, input.GetV1().Data); err != nil { + log.Warn("invalid group message", zap.Error(err)) return nil, err } // TODO: Wrap this in a transaction so publishing is all or nothing decodedGroupId, err := hex.DecodeString(result.GroupId) if err != nil { + log.Warn("invalid group id", zap.Error(err)) return nil, status.Error(codes.InvalidArgument, "invalid group id") } msg, err := s.store.InsertGroupMessage(ctx, decodedGroupId, input.GetV1().Data) if err != nil { + log.Warn("error inserting message", zap.Error(err)) if mlsstore.IsAlreadyExistsError(err) { continue } @@ -272,18 +283,22 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes }, }) if err != nil { + log.Error("error serializing message", zap.Error(err)) return nil, err } + contentTopic := topic.BuildMLSV1GroupTopic(decodedGroupId) + err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{ - ContentTopic: topic.BuildMLSV1GroupTopic(decodedGroupId), + ContentTopic: contentTopic, Timestamp: msg.CreatedAt.UnixNano(), Payload: msgB, }) - if err != nil { + log.Error("error publishing to waku message", zap.Error(err), zap.String("contentTopic", contentTopic)) return nil, err } + log.Info("published to waku relay", zap.String("contentTopic", contentTopic)) } return &emptypb.Empty{}, nil @@ -342,6 +357,7 @@ func (s *Service) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelc func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesRequest, stream mlsv1.MlsApi_SubscribeGroupMessagesServer) error { log := s.log.Named("subscribe-group-messages").With(zap.Int("filters", len(req.Filters))) log.Info("subscription started") + defer log.Info("subscription ended") // Send a header (any header) to fix an issue with Tonic based GRPC clients. // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true")) @@ -349,6 +365,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques var streamLock sync.Mutex for _, filter := range req.Filters { natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId) + log.Info("subscribing to nats subject", zap.String("subject", natsSubject), zap.String("group_id", hex.EncodeToString(filter.GroupId))) sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) { log.Info("received message from nats") var msg mlsv1.GroupMessage @@ -386,6 +403,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRequest, stream mlsv1.MlsApi_SubscribeWelcomeMessagesServer) error { log := s.log.Named("subscribe-welcome-messages").With(zap.Int("filters", len(req.Filters))) log.Info("subscription started") + defer log.Info("subscription ended") // Send a header (any header) to fix an issue with Tonic based GRPC clients. // See: https://github.com/xmtp/libxmtp/pull/58 _ = stream.SendHeader(metadata.Pairs("subscribed", "true"))