Skip to content

Commit

Permalink
Log when subscriptions start (#352)
Browse files Browse the repository at this point in the history
* Log when subscriptions start

* Log even more
  • Loading branch information
neekolas authored Feb 2, 2024
1 parent 8c7d894 commit 2a7240d
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions pkg/mls/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func (s *Service) SendGroupMessages(ctx context.Context, req *mlsv1.SendGroupMes
Timestamp: msg.CreatedAt.UnixNano(),
Payload: msgB,
})

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -340,7 +341,7 @@ func (s *Service) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelc

func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesRequest, stream mlsv1.MlsApi_SubscribeGroupMessagesServer) error {
log := s.log.Named("subscribe-group-messages").With(zap.Int("filters", len(req.Filters)))

log.Info("subscription started")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))
Expand All @@ -349,6 +350,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques
for _, filter := range req.Filters {
natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId)
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.GroupMessage
err := pb.Unmarshal(natsMsg.Data, &msg)
if err != nil {
Expand Down Expand Up @@ -383,7 +385,7 @@ func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesReques

func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRequest, stream mlsv1.MlsApi_SubscribeWelcomeMessagesServer) error {
log := s.log.Named("subscribe-welcome-messages").With(zap.Int("filters", len(req.Filters)))

log.Info("subscription started")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))
Expand All @@ -392,6 +394,7 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe
for _, filter := range req.Filters {
natsSubject := buildNatsSubjectForWelcomeMessages(filter.InstallationKey)
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.WelcomeMessage
err := pb.Unmarshal(natsMsg.Data, &msg)
if err != nil {
Expand Down

0 comments on commit 2a7240d

Please sign in to comment.