From bc525a52485bf023d6ea1c89ce86e8e979986e97 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 24 Apr 2024 16:43:17 -0400 Subject: [PATCH 1/5] GetInboxIds --- pkg/authn/authn.pb.go | 4 +- pkg/identity/api/v1/identity_service.go | 8 ++-- pkg/mls/store/models.go | 9 +++++ pkg/mls/store/store.go | 45 ++++++++++++++++++++++ pkg/mls/store/store_test.go | 50 +++++++++++++++++++++++++ 5 files changed, 111 insertions(+), 5 deletions(-) diff --git a/pkg/authn/authn.pb.go b/pkg/authn/authn.pb.go index c0de527b..11767aff 100644 --- a/pkg/authn/authn.pb.go +++ b/pkg/authn/authn.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v4.25.3 +// protoc-gen-go v1.33.0 +// protoc v4.24.4 // source: authn.proto package authn diff --git a/pkg/identity/api/v1/identity_service.go b/pkg/identity/api/v1/identity_service.go index 7f36b31d..19da6de0 100644 --- a/pkg/identity/api/v1/identity_service.go +++ b/pkg/identity/api/v1/identity_service.go @@ -7,8 +7,8 @@ import ( "github.com/xmtp/xmtp-node-go/pkg/mlsvalidate" api "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1" "go.uber.org/zap" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + // "google.golang.org/grpc/codes" + // "google.golang.org/grpc/status" ) type Service struct { @@ -91,11 +91,13 @@ func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUp } func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest) (*api.GetInboxIdsResponse, error) { + // Needs a group by statement /* Algorithm for each request: 1. Query the address_log table for the largest association_sequence_id for the address where revocation_sequence_id is lower or NULL 2. Return the value of the 'inbox_id' column */ - return nil, status.Errorf(codes.Unimplemented, "unimplemented") + + return s.store.GetInboxIds(ctx, req) } diff --git a/pkg/mls/store/models.go b/pkg/mls/store/models.go index 47d7145b..75e9ff27 100644 --- a/pkg/mls/store/models.go +++ b/pkg/mls/store/models.go @@ -24,6 +24,15 @@ type InboxLogEntry struct { IdentityUpdateProto []byte } +type AddressLogEntry struct { + bun.BaseModel `bun:"table:address_log"` + + Address string `bun:",notnull"` + InboxId string `bun:",notnull"` + AssociationSequenceId *uint64 `bun:","` + RevocationSequenceId *uint64 `bun:","` +} + type Installation struct { bun.BaseModel `bun:"table:installations"` diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index db2300da..4ba604c4 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -203,6 +203,51 @@ func (s *Store) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdent }, nil } +func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error) { + + addresses := []string{} + for _, request := range req.Requests { + addresses = append(addresses, request.GetAddress()) + } + s.log.Info("GetInboxIds", zap.Strings("addresses", addresses)) + + db_log_entries := make([]*AddressLogEntry, 0) + + // maybe restrict select to only fields in groupby? + err := s.db.NewSelect(). + Model(&db_log_entries). + Where("address IN (?)", bun.In(addresses)). + Where("revocation_sequence_id IS NULL OR revocation_sequence_id < association_sequence_id"). + Group("address", "inbox_id", "association_sequence_id", "revocation_sequence_id"). + Order("association_sequence_id DESC"). + Scan(ctx) + + s.log.Error("Db", zap.Any("err", err)) + if err != nil { + return nil, err + } + + s.log.Info("Db", zap.Any("db_log_entries", db_log_entries)) + + out := make([]*identity.GetInboxIdsResponse_Response, len(addresses)) + + for index, address := range addresses { + resp := identity.GetInboxIdsResponse_Response{} + resp.Address = address + + for _, log_entry := range db_log_entries { + if log_entry.Address == address { + resp.InboxId = &log_entry.InboxId + } + } + out[index] = &resp + } + + return &identity.GetInboxIdsResponse{ + Responses: out, + }, nil +} + // Creates the installation and last resort key package func (s *Store) CreateInstallation(ctx context.Context, installationId []byte, walletAddress string, credentialIdentity, keyPackage []byte, expiration uint64) error { createdAt := nowNs() diff --git a/pkg/mls/store/store_test.go b/pkg/mls/store/store_test.go index 0f74996e..306d1032 100644 --- a/pkg/mls/store/store_test.go +++ b/pkg/mls/store/store_test.go @@ -657,3 +657,53 @@ func TestQueryWelcomeMessagesV1_Paginate(t *testing.T) { require.Equal(t, []byte("content7"), resp.Messages[0].GetV1().Data) require.Equal(t, []byte("content8"), resp.Messages[1].GetV1().Data) } + +func InsertAddressLog(store *Store, address string, inboxId string, associationSequenceId *uint64, revocationSequenceId *uint64) error { + + entry := AddressLogEntry{ + Address: address, + InboxId: inboxId, + AssociationSequenceId: associationSequenceId, + RevocationSequenceId: revocationSequenceId, + } + ctx := context.Background() + + _, err := store.db.NewInsert(). + Model(&entry). + Exec(ctx) + + return err +} + +func TestInboxIds(t *testing.T) { + store, cleanup := NewTestStore(t) + defer cleanup() + + seq, rev := uint64(1), uint64(5) + err := InsertAddressLog(store, "address", "inbox1", &seq, &rev) + seq, rev = uint64(2), uint64(8) + err = InsertAddressLog(store, "address", "inbox1", &seq, &rev) + seq, rev = uint64(3), uint64(9) + err = InsertAddressLog(store, "address", "inbox1", &seq, &rev) + seq, rev = uint64(4), uint64(1) + err = InsertAddressLog(store, "address", "correct", &seq, &rev) + require.NoError(t, err) + + reqs := make([]*identity.GetInboxIdsRequest_Request, 0) + reqs = append(reqs, &identity.GetInboxIdsRequest_Request{ + Address: "address", + }) + req := &identity.GetInboxIdsRequest{ + Requests: reqs, + } + resp, _ := store.GetInboxIds(context.Background(), req) + + require.Equal(t, "correct", *resp.Responses[0].InboxId) + + seq = uint64(5) + err = InsertAddressLog(store, "address", "correct_inbox2", &seq, nil) + resp, _ = store.GetInboxIds(context.Background(), req) + t.Log(resp) + require.Equal(t, "correct_inbox2", *resp.Responses[0].InboxId) + +} From b88460b278a23db865a1039967e93e208d1cf9f2 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 24 Apr 2024 18:28:44 -0400 Subject: [PATCH 2/5] fix query --- pkg/mls/store/store.go | 21 ++++++++++++--------- pkg/mls/store/store_test.go | 9 ++++++++- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index 4ba604c4..6b412bff 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -203,32 +203,35 @@ func (s *Store) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdent }, nil } +type GroupedAddressLogEntry struct { + Address string + InboxId string + MaxAssociationSequenceId uint64 `bun:max_association_sequence_id",notnull"` +} + func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error) { addresses := []string{} for _, request := range req.Requests { addresses = append(addresses, request.GetAddress()) } - s.log.Info("GetInboxIds", zap.Strings("addresses", addresses)) - db_log_entries := make([]*AddressLogEntry, 0) + db_log_entries := make([]*GroupedAddressLogEntry, 0) // maybe restrict select to only fields in groupby? err := s.db.NewSelect(). - Model(&db_log_entries). + Model((*AddressLogEntry)(nil)). + ColumnExpr("address, inbox_id, MAX(association_sequence_id) as max_association_sequence_id"). Where("address IN (?)", bun.In(addresses)). Where("revocation_sequence_id IS NULL OR revocation_sequence_id < association_sequence_id"). - Group("address", "inbox_id", "association_sequence_id", "revocation_sequence_id"). - Order("association_sequence_id DESC"). - Scan(ctx) + Group("address", "inbox_id"). + OrderExpr("MAX(association_sequence_id) ASC"). + Scan(ctx, &db_log_entries) - s.log.Error("Db", zap.Any("err", err)) if err != nil { return nil, err } - s.log.Info("Db", zap.Any("db_log_entries", db_log_entries)) - out := make([]*identity.GetInboxIdsResponse_Response, len(addresses)) for index, address := range addresses { diff --git a/pkg/mls/store/store_test.go b/pkg/mls/store/store_test.go index 306d1032..26ea81a3 100644 --- a/pkg/mls/store/store_test.go +++ b/pkg/mls/store/store_test.go @@ -703,7 +703,14 @@ func TestInboxIds(t *testing.T) { seq = uint64(5) err = InsertAddressLog(store, "address", "correct_inbox2", &seq, nil) resp, _ = store.GetInboxIds(context.Background(), req) - t.Log(resp) require.Equal(t, "correct_inbox2", *resp.Responses[0].InboxId) + reqs = append(reqs, &identity.GetInboxIdsRequest_Request{Address: "address2"}) + req = &identity.GetInboxIdsRequest{ + Requests: reqs, + } + seq, rev = uint64(8), uint64(2) + err = InsertAddressLog(store, "address2", "inbox2", &seq, &rev) + resp, _ = store.GetInboxIds(context.Background(), req) + require.Equal(t, "inbox2", *resp.Responses[1].InboxId) } From 39d07a5ab8b14a6da2023d3d53f140d80d71b712 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 24 Apr 2024 18:32:42 -0400 Subject: [PATCH 3/5] fix golangci lint --- pkg/mls/store/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index 6b412bff..a54c2282 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -206,7 +206,7 @@ func (s *Store) GetInboxLogs(ctx context.Context, batched_req *identity.GetIdent type GroupedAddressLogEntry struct { Address string InboxId string - MaxAssociationSequenceId uint64 `bun:max_association_sequence_id",notnull"` + MaxAssociationSequenceId uint64 } func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error) { From 2305c3c1ab5a29436e1f264951cafd9313ff896f Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 24 Apr 2024 18:36:51 -0400 Subject: [PATCH 4/5] fix err lints --- pkg/identity/api/v1/identity_service.go | 1 - pkg/mls/store/store_test.go | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/identity/api/v1/identity_service.go b/pkg/identity/api/v1/identity_service.go index 19da6de0..18d30c77 100644 --- a/pkg/identity/api/v1/identity_service.go +++ b/pkg/identity/api/v1/identity_service.go @@ -91,7 +91,6 @@ func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUp } func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest) (*api.GetInboxIdsResponse, error) { - // Needs a group by statement /* Algorithm for each request: 1. Query the address_log table for the largest association_sequence_id diff --git a/pkg/mls/store/store_test.go b/pkg/mls/store/store_test.go index 26ea81a3..50d49ded 100644 --- a/pkg/mls/store/store_test.go +++ b/pkg/mls/store/store_test.go @@ -681,10 +681,13 @@ func TestInboxIds(t *testing.T) { seq, rev := uint64(1), uint64(5) err := InsertAddressLog(store, "address", "inbox1", &seq, &rev) + require.NoError(t, err) seq, rev = uint64(2), uint64(8) err = InsertAddressLog(store, "address", "inbox1", &seq, &rev) + require.NoError(t, err) seq, rev = uint64(3), uint64(9) err = InsertAddressLog(store, "address", "inbox1", &seq, &rev) + require.NoError(t, err) seq, rev = uint64(4), uint64(1) err = InsertAddressLog(store, "address", "correct", &seq, &rev) require.NoError(t, err) @@ -702,6 +705,7 @@ func TestInboxIds(t *testing.T) { seq = uint64(5) err = InsertAddressLog(store, "address", "correct_inbox2", &seq, nil) + require.NoError(t, err) resp, _ = store.GetInboxIds(context.Background(), req) require.Equal(t, "correct_inbox2", *resp.Responses[0].InboxId) @@ -711,6 +715,7 @@ func TestInboxIds(t *testing.T) { } seq, rev = uint64(8), uint64(2) err = InsertAddressLog(store, "address2", "inbox2", &seq, &rev) + require.NoError(t, err) resp, _ = store.GetInboxIds(context.Background(), req) require.Equal(t, "inbox2", *resp.Responses[1].InboxId) } From 0f9aeeed7fb4cf269c9636837967435b9ca06f67 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Thu, 25 Apr 2024 11:14:32 -0400 Subject: [PATCH 5/5] latest --- pkg/mls/store/store.go | 22 ++++++++++++++-------- pkg/mls/store/store_test.go | 1 + 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index a54c2282..d182401c 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -216,18 +216,24 @@ func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsReques addresses = append(addresses, request.GetAddress()) } - db_log_entries := make([]*GroupedAddressLogEntry, 0) + db_log_entries := make([]*AddressLogEntry, 0) - // maybe restrict select to only fields in groupby? - err := s.db.NewSelect(). - Model((*AddressLogEntry)(nil)). - ColumnExpr("address, inbox_id, MAX(association_sequence_id) as max_association_sequence_id"). + s.log.Info("Addresses", zap.Any("addresses", addresses)) + subquery := s.db.NewSelect(). + Column("address"). + ColumnExpr("MAX(association_sequence_id) AS max_association_sequence_id"). + Table("address_log"). Where("address IN (?)", bun.In(addresses)). - Where("revocation_sequence_id IS NULL OR revocation_sequence_id < association_sequence_id"). - Group("address", "inbox_id"). - OrderExpr("MAX(association_sequence_id) ASC"). + Group("address") + + err := s.db.NewSelect(). + Column("a.address", "a.inbox_id", "a.association_sequence_id"). + TableExpr("address_log AS a"). + Join("INNER JOIN (?) AS b ON a.address = b.address AND a.association_sequence_id = b.max_association_sequence_id", subquery). Scan(ctx, &db_log_entries) + s.log.Info("GetInboxIds", zap.Any("db_log_entries", db_log_entries)) + if err != nil { return nil, err } diff --git a/pkg/mls/store/store_test.go b/pkg/mls/store/store_test.go index 50d49ded..15371925 100644 --- a/pkg/mls/store/store_test.go +++ b/pkg/mls/store/store_test.go @@ -700,6 +700,7 @@ func TestInboxIds(t *testing.T) { Requests: reqs, } resp, _ := store.GetInboxIds(context.Background(), req) + t.Log(resp) require.Equal(t, "correct", *resp.Responses[0].InboxId)