Skip to content

Commit

Permalink
Add a crazy amount of logging (#353)
Browse files Browse the repository at this point in the history
* Add a crazy amount of logging

* One more log

* Fix bug
  • Loading branch information
neekolas authored Feb 3, 2024
1 parent 2a7240d commit 10e6617
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (s *Server) startGRPC() error {
wakuMsg := wakuEnv.Message()

if topic.IsMLSV1(wakuMsg.ContentTopic) {
s.Log.Info("handling waku relay message in mlsv1 service", zap.String("topic", wakuMsg.ContentTopic))
if s.mlsv1 != nil {
err := s.mlsv1.HandleIncomingWakuRelayMessage(wakuEnv.Message())
if err != nil {
Expand Down
26 changes: 22 additions & 4 deletions pkg/mls/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (s *Service) Close() {

func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) error {
if topic.IsMLSV1Group(wakuMsg.ContentTopic) {
s.log.Info("received group message from waku relay", zap.String("topic", wakuMsg.ContentTopic))
var msg mlsv1.GroupMessage
err := pb.Unmarshal(wakuMsg.Payload, &msg)
if err != nil {
Expand All @@ -96,11 +97,14 @@ func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) er
if msg.GetV1() == nil {
return nil
}
err = s.nc.Publish(buildNatsSubjectForGroupMessages(msg.GetV1().GroupId), wakuMsg.Payload)
natsSubject := buildNatsSubjectForGroupMessages(msg.GetV1().GroupId)
s.log.Info("publishing to nats subject from relay", zap.String("subject", natsSubject))
err = s.nc.Publish(natsSubject, wakuMsg.Payload)
if err != nil {
return err
}
} else if topic.IsMLSV1Welcome(wakuMsg.ContentTopic) {
s.log.Info("received welcome message from waku relay", zap.String("topic", wakuMsg.ContentTopic))
var msg mlsv1.WelcomeMessage
err := pb.Unmarshal(wakuMsg.Payload, &msg)
if err != nil {
Expand All @@ -109,7 +113,9 @@ func (s *Service) HandleIncomingWakuRelayMessage(wakuMsg *wakupb.WakuMessage) er
if msg.GetV1() == nil {
return nil
}
err = s.nc.Publish(buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey), wakuMsg.Payload)
natsSubject := buildNatsSubjectForWelcomeMessages(msg.GetV1().InstallationKey)
s.log.Info("publishing to nats subject from relay", zap.String("subject", natsSubject))
err = s.nc.Publish(natsSubject, wakuMsg.Payload)
if err != nil {
return err
}
Expand Down Expand Up @@ -231,6 +237,7 @@ func (s *Service) GetIdentityUpdates(ctx context.Context, req *mlsv1.GetIdentity
}

func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMessagesRequest) (res *emptypb.Empty, err error) {
log := s.log.Named("send-group-messages")
if err = validateSendGroupMessagesRequest(req); err != nil {
return nil, err
}
Expand All @@ -240,21 +247,25 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
// TODO: Separate validation errors from internal errors
return nil, status.Errorf(codes.InvalidArgument, "invalid group message: %s", err)
}
log.Info("validated group messages", zap.Int("count", len(validationResults)))

for i, result := range validationResults {
input := req.Messages[i]

if err = requireReadyToSend(result.GroupId, input.GetV1().Data); err != nil {
log.Warn("invalid group message", zap.Error(err))
return nil, err
}

// TODO: Wrap this in a transaction so publishing is all or nothing
decodedGroupId, err := hex.DecodeString(result.GroupId)
if err != nil {
log.Warn("invalid group id", zap.Error(err))
return nil, status.Error(codes.InvalidArgument, "invalid group id")
}
msg, err := s.store.InsertGroupMessage(ctx, decodedGroupId, input.GetV1().Data)
if err != nil {
log.Warn("error inserting message", zap.Error(err))
if mlsstore.IsAlreadyExistsError(err) {
continue
}
Expand All @@ -272,18 +283,22 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
},
})
if err != nil {
log.Error("error serializing message", zap.Error(err))
return nil, err
}

contentTopic := topic.BuildMLSV1GroupTopic(decodedGroupId)

err = s.publishToWakuRelay(ctx, &wakupb.WakuMessage{
ContentTopic: topic.BuildMLSV1GroupTopic(decodedGroupId),
ContentTopic: contentTopic,
Timestamp: msg.CreatedAt.UnixNano(),
Payload: msgB,
})

if err != nil {
log.Error("error publishing to waku message", zap.Error(err), zap.String("contentTopic", contentTopic))
return nil, err
}
log.Info("published to waku relay", zap.String("contentTopic", contentTopic))
}

return &emptypb.Empty{}, nil
Expand Down Expand Up @@ -342,13 +357,15 @@ func (s *Service) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelc
func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesRequest, stream mlsv1.MlsApi_SubscribeGroupMessagesServer) error {
log := s.log.Named("subscribe-group-messages").With(zap.Int("filters", len(req.Filters)))
log.Info("subscription started")
defer log.Info("subscription ended")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))

var streamLock sync.Mutex
for _, filter := range req.Filters {
natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId)
log.Info("subscribing to nats subject", zap.String("subject", natsSubject), zap.String("group_id", hex.EncodeToString(filter.GroupId)))
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.GroupMessage
Expand Down Expand Up @@ -386,6 +403,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques
func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRequest, stream mlsv1.MlsApi_SubscribeWelcomeMessagesServer) error {
log := s.log.Named("subscribe-welcome-messages").With(zap.Int("filters", len(req.Filters)))
log.Info("subscription started")
defer log.Info("subscription ended")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))
Expand Down

0 comments on commit 10e6617

Please sign in to comment.