Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GetInboxIDs and use validation service in PublishIdentityUpdates #382

Merged
merged 37 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
831b4e6
Initial commit
neekolas Apr 24, 2024
a99c43b
Set up sqlc
neekolas Apr 25, 2024
86b7131
Initial commit
neekolas Apr 24, 2024
2fa85f6
Set up sqlc
neekolas Apr 25, 2024
1a44dc9
Merge branch 'nm/sqlc-experiment' of github.com:xmtp/xmtp-node-go int…
neekolas Apr 25, 2024
fa8b6f7
Migrate first store methods to sqlc
neekolas Apr 25, 2024
3d192f3
Merge branch 'nm/sqlc-experiment' of github.com:xmtp/xmtp-node-go int…
neekolas Apr 25, 2024
7d9d984
Merge branch 'nm/sqlc-experiment' of github.com:xmtp/xmtp-node-go int…
neekolas Apr 25, 2024
8b434fd
Merge branch 'nm/sqlc-experiment' of github.com:xmtp/xmtp-node-go int…
neekolas Apr 25, 2024
7629bc7
Migrate first store methods to sqlc
neekolas Apr 25, 2024
5a658b5
Merge branch 'nm/sqlc-experiment' of github.com:xmtp/xmtp-node-go int…
neekolas Apr 25, 2024
14bb16b
Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.co…
neekolas Apr 25, 2024
003476f
Migrate first store methods to sqlc
neekolas Apr 25, 2024
3f72749
Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.co…
neekolas Apr 25, 2024
3e8f57a
Add GetAddressLog SQL
neekolas Apr 25, 2024
fe7a927
Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.co…
neekolas Apr 25, 2024
2b5d5f8
Add GetAddressLog SQL
neekolas Apr 25, 2024
c0f7e71
Merge branch '04-25-add_getaddresslog_sql' of github.com:xmtp/xmtp-no…
neekolas Apr 25, 2024
783c7ac
Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.co…
neekolas Apr 25, 2024
341668e
Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.co…
neekolas Apr 25, 2024
7919c2e
Add GetAddressLog SQL
neekolas Apr 25, 2024
0129dc5
Initial commit
neekolas Apr 24, 2024
83420c2
Set up sqlc
neekolas Apr 25, 2024
57a4ea2
Initial commit
neekolas Apr 24, 2024
696e17c
Set up sqlc
neekolas Apr 25, 2024
dc19304
Merge branch '04-25-add_getaddresslog_sql' of github.com:xmtp/xmtp-no…
neekolas Apr 25, 2024
065a73c
Add InsertLog Query (#383)
insipx Apr 25, 2024
e3ee7a1
Migrate first store methods to sqlc
neekolas Apr 25, 2024
f420c39
Migrate MLS DB To SQLC (#380)
neekolas Apr 25, 2024
f87e45c
Migrate first store methods to sqlc
neekolas Apr 25, 2024
40b36b0
Add InsertLog Query (#383)
insipx Apr 25, 2024
6b679b1
Merge branch '04-25-add_getaddresslog_sql' of github.com:xmtp/xmtp-no…
neekolas Apr 25, 2024
7f6a1d6
Add GetAddressLog SQL
neekolas Apr 25, 2024
31dbc3a
Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.co…
neekolas Apr 25, 2024
64d8d52
Add InsertLog Query (#383)
insipx Apr 25, 2024
4472886
Add InsertLog Query (#383)
insipx Apr 25, 2024
8182293
Merge branch '04-25-add_getaddresslog_sql' of github.com:xmtp/xmtp-no…
neekolas Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/identity/api/v1/identity_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/xmtp/xmtp-node-go/pkg/mlsvalidate"
api "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Service struct {
Expand Down Expand Up @@ -78,7 +76,7 @@ Start transaction (SERIALIZABLE isolation level)
End transaction
*/
func (s *Service) PublishIdentityUpdate(ctx context.Context, req *api.PublishIdentityUpdateRequest) (*api.PublishIdentityUpdateResponse, error) {
return s.store.PublishIdentityUpdate(ctx, req)
return s.store.PublishIdentityUpdate(ctx, req, s.validationService)
}

func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUpdatesRequest) (*api.GetIdentityUpdatesResponse, error) {
Expand All @@ -97,5 +95,5 @@ func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest)
for the address where revocation_sequence_id is lower or NULL
2. Return the value of the 'inbox_id' column
*/
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
return s.store.GetInboxIds(ctx, req)
}
22 changes: 21 additions & 1 deletion pkg/identity/api/v1/identity_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,27 @@ type mockedMLSValidationService struct {
}

func (m *mockedMLSValidationService) GetAssociationState(ctx context.Context, oldUpdates []*associations.IdentityUpdate, newUpdates []*associations.IdentityUpdate) (*mlsvalidate.AssociationStateResult, error) {
return nil, nil

member_map := make([]*associations.MemberMap, 0)
member_map = append(member_map, &associations.MemberMap{
Key: &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "key_address"}},
Value: &associations.Member{
Identifier: &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "ident"}},
AddedByEntity: &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "added_by_entity"}},
},
})

new_members := make([]*associations.MemberIdentifier, 0)

new_members = append(new_members, &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "0x01"}})
new_members = append(new_members, &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "0x02"}})
new_members = append(new_members, &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "0x03"}})

out := mlsvalidate.AssociationStateResult{
AssociationState: &associations.AssociationState{InboxId: "test_inbox", Members: member_map, RecoveryAddress: "recovery", SeenSignatures: [][]byte{[]byte("seen"), []byte("sig")}},
StateDiff: &associations.AssociationStateDiff{NewMembers: new_members, RemovedMembers: nil},
}
return &out, nil
}

func (m *mockedMLSValidationService) ValidateKeyPackages(ctx context.Context, keyPackages [][]byte) ([]mlsvalidate.IdentityValidationResult, error) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/migrations/mls/20240411200242_init-identity.up.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
SET statement_timeout = 0;

--bun:split

CREATE TABLE inbox_log (
sequence_id BIGSERIAL PRIMARY KEY,
inbox_id TEXT NOT NULL,
Expand All @@ -10,11 +9,9 @@ CREATE TABLE inbox_log (
);

--bun:split

CREATE INDEX idx_inbox_log_inbox_id ON inbox_log(inbox_id);

--bun:split

CREATE TABLE address_log (
address TEXT NOT NULL,
inbox_id TEXT NOT NULL,
Expand All @@ -23,5 +20,4 @@ CREATE TABLE address_log (
);

--bun:split

CREATE INDEX idx_address_log_address_inbox_id ON address_log(address, inbox_id);
9 changes: 9 additions & 0 deletions pkg/mls/store/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"github.com/uptrace/bun"
)

type AddressLogEntry struct {
bun.BaseModel `bun:"table:address_log"`

Address string `bun:",notnull"`
InboxId string `bun:",notnull"`
AssociationSequenceId *uint64 `bun:","`
RevocationSequenceId *uint64 `bun:","`
}

type InboxLogEntry struct {
bun.BaseModel `bun:"table:inbox_log"`

Expand Down
39 changes: 39 additions & 0 deletions pkg/mls/store/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,31 @@ FROM inbox_log AS a
AND a.sequence_id > b.sequence_id
ORDER BY a.sequence_id ASC;

-- name: GetAddressLogs :many
SELECT a.address,
a.inbox_id,
a.association_sequence_id
FROM address_log a
INNER JOIN (
SELECT address,
MAX(association_sequence_id) AS max_association_sequence_id
FROM address_log
WHERE address = ANY (@addresses::text [])
AND revocation_sequence_id IS NULL
GROUP BY address
) b ON a.address = b.address
AND a.association_sequence_id = b.max_association_sequence_id;

-- name: InsertAddressLog :one
INSERT INTO address_log (
address,
inbox_id,
association_sequence_id,
revocation_sequence_id
)
VALUES ($1, $2, $3, $4)
RETURNING *;

-- name: InsertInboxLog :one
INSERT INTO inbox_log (
inbox_id,
Expand All @@ -24,6 +49,20 @@ INSERT INTO inbox_log (
VALUES ($1, $2, $3)
RETURNING sequence_id;

-- name: RevokeAddressFromLog :exec
UPDATE address_log
SET revocation_sequence_id = $1
WHERE (address, inbox_id, association_sequence_id) = (
SELECT address,
inbox_id,
MAX(association_sequence_id)
FROM address_log AS a
WHERE a.address = $2
AND a.inbox_id = $3
GROUP BY address,
inbox_id
);

-- name: CreateInstallation :exec
INSERT INTO installations (
id,
Expand Down
106 changes: 106 additions & 0 deletions pkg/mls/store/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 72 additions & 5 deletions pkg/mls/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/uptrace/bun/migrate"
migrations "github.com/xmtp/xmtp-node-go/pkg/migrations/mls"
queries "github.com/xmtp/xmtp-node-go/pkg/mls/store/queries"
"github.com/xmtp/xmtp-node-go/pkg/mlsvalidate"
identity "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1"
"github.com/xmtp/xmtp-node-go/pkg/proto/identity/associations"
mlsv1 "github.com/xmtp/xmtp-node-go/pkg/proto/mls/api/v1"
Expand All @@ -30,8 +31,9 @@ type Store struct {
}

type IdentityStore interface {
PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest) (*identity.PublishIdentityUpdateResponse, error)
PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identity.PublishIdentityUpdateResponse, error)
GetInboxLogs(ctx context.Context, req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error)
GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error)
}

type MlsStore interface {
Expand Down Expand Up @@ -67,7 +69,38 @@ func New(ctx context.Context, config Config) (*Store, error) {
return s, nil
}

func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest) (*identity.PublishIdentityUpdateResponse, error) {
func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error) {

addresses := []string{}
for _, request := range req.Requests {
addresses = append(addresses, request.GetAddress())
}

addressLogEntries, err := s.queries.GetAddressLogs(ctx, addresses)
if err != nil {
return nil, err
}

out := make([]*identity.GetInboxIdsResponse_Response, len(addresses))

for index, address := range addresses {
resp := identity.GetInboxIdsResponse_Response{}
resp.Address = address

for _, log_entry := range addressLogEntries {
if log_entry.Address == address {
resp.InboxId = &log_entry.InboxID
}
}
out[index] = &resp
}

return &identity.GetInboxIdsResponse{
Responses: out,
}, nil
}

func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identity.PublishIdentityUpdateResponse, error) {
new_update := req.GetIdentityUpdate()
if new_update == nil {
return nil, errors.New("IdentityUpdate is required")
Expand All @@ -93,23 +126,57 @@ func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.Publish
}
_ = append(updates, new_update)

// TODO: Validate the updates, and abort transaction if failed
state, err := validationService.GetAssociationState(ctx, updates, []*associations.IdentityUpdate{new_update})
if err != nil {
return err
}

s.log.Info("Got association state", zap.Any("state", state))
protoBytes, err := proto.Marshal(new_update)
if err != nil {
return err
}

_, err = txQueries.InsertInboxLog(ctx, queries.InsertInboxLogParams{
sequence_id, err := txQueries.InsertInboxLog(ctx, queries.InsertInboxLogParams{
InboxID: new_update.GetInboxId(),
ServerTimestampNs: nowNs(),
IdentityUpdateProto: protoBytes,
})

s.log.Info("Inserted inbox log", zap.Any("sequence_id", sequence_id))

if err != nil {
return err
}
// TODO: Insert or update the address_log table using sequence_id

for _, new_member := range state.StateDiff.NewMembers {
s.log.Info("New member", zap.Any("member", new_member))
if address, ok := new_member.Kind.(*associations.MemberIdentifier_Address); ok {
_, err = txQueries.InsertAddressLog(ctx, queries.InsertAddressLogParams{
Address: address.Address,
InboxID: state.AssociationState.InboxId,
AssociationSequenceID: sql.NullInt64{Valid: true, Int64: sequence_id},
RevocationSequenceID: sql.NullInt64{Valid: false},
})
if err != nil {
return err
}
}
}

for _, removed_member := range state.StateDiff.RemovedMembers {
s.log.Info("New member", zap.Any("member", removed_member))
if address, ok := removed_member.Kind.(*associations.MemberIdentifier_Address); ok {
err = txQueries.RevokeAddressFromLog(ctx, queries.RevokeAddressFromLogParams{
Address: address.Address,
InboxID: state.AssociationState.InboxId,
RevocationSequenceID: sql.NullInt64{Valid: true, Int64: sequence_id},
})
if err != nil {
return err
}
}
}

return nil
}); err != nil {
Expand Down
Loading
Loading