Skip to content

Commit

Permalink
Allow MLS topics in subscribeall (#356)
Browse files Browse the repository at this point in the history
* Allow MLS topics in subscribeall

* Remove noisy log
  • Loading branch information
neekolas authored Feb 21, 2024
1 parent fad7a17 commit dcd9beb
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 7 deletions.
4 changes: 2 additions & 2 deletions pkg/api/message/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ func buildEnvelope(msg *wakupb.WakuMessage) *proto.Envelope {
}
}

func isValidSubscribeAllTopic(topic string) bool {
return strings.HasPrefix(topic, validXMTPTopicPrefix)
func isValidSubscribeAllTopic(contentTopic string) bool {
return strings.HasPrefix(contentTopic, validXMTPTopicPrefix) || topic.IsMLSV1(contentTopic)
}

func fromWakuTimestamp(ts int64) uint64 {
Expand Down
1 change: 0 additions & 1 deletion pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ func (s *Server) startGRPC() error {
wakuMsg := wakuEnv.Message()

if topic.IsMLSV1(wakuMsg.ContentTopic) {
s.Log.Info("handling waku relay message in mlsv1 service", zap.String("topic", wakuMsg.ContentTopic))
if s.mlsv1 != nil {
err := s.mlsv1.HandleIncomingWakuRelayMessage(wakuEnv.Message())
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions pkg/mls/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,17 +363,14 @@ func (s *Service) QueryWelcomeMessages(ctx context.Context, req *mlsv1.QueryWelc
func (s *Service) SubscribeGroupMessages(req *mlsv1.SubscribeGroupMessagesRequest, stream mlsv1.MlsApi_SubscribeGroupMessagesServer) error {
log := s.log.Named("subscribe-group-messages").With(zap.Int("filters", len(req.Filters)))
log.Info("subscription started")
defer log.Info("subscription ended")
// Send a header (any header) to fix an issue with Tonic based GRPC clients.
// See: https://github.com/xmtp/libxmtp/pull/58
_ = stream.SendHeader(metadata.Pairs("subscribed", "true"))

var streamLock sync.Mutex
for _, filter := range req.Filters {
natsSubject := buildNatsSubjectForGroupMessages(filter.GroupId)
log.Info("subscribing to nats subject", zap.String("subject", natsSubject), zap.String("group_id", hex.EncodeToString(filter.GroupId)))
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.GroupMessage
err := pb.Unmarshal(natsMsg.Data, &msg)
if err != nil {
Expand Down Expand Up @@ -418,7 +415,6 @@ func (s *Service) SubscribeWelcomeMessages(req *mlsv1.SubscribeWelcomeMessagesRe
for _, filter := range req.Filters {
natsSubject := buildNatsSubjectForWelcomeMessages(filter.InstallationKey)
sub, err := s.nc.Subscribe(natsSubject, func(natsMsg *nats.Msg) {
log.Info("received message from nats")
var msg mlsv1.WelcomeMessage
err := pb.Unmarshal(natsMsg.Data, &msg)
if err != nil {
Expand Down

0 comments on commit dcd9beb

Please sign in to comment.