Skip to content

Commit

Permalink
Add GetInboxIDs and use validation service in PublishIdentityUpdates (#…
Browse files Browse the repository at this point in the history
…382)

* Initial commit

* Set up sqlc

* Initial commit

* Set up sqlc

* Migrate first store methods to sqlc

* Migrate first store methods to sqlc

* Migrate first store methods to sqlc

* Add GetAddressLog SQL

* Add GetAddressLog SQL

* Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.com:xmtp/xmtp-node-go into 04-24-migrate_first_store_methods_to_sqlc

* Add GetAddressLog SQL

* Initial commit

* Set up sqlc

* Initial commit

* Set up sqlc

* Add InsertLog Query (#383)

* Add insertlog query

* validation service

* insert log

* revocation for removed members

* lint

* remove unnecessary log

* change test to use query from sqlc

* remove comments

* fix tests

* Migrate first store methods to sqlc

* Migrate MLS DB To SQLC (#380)

Bun has been bothering me for a while, so I've been working on a migration that will get rid of our ORM altogether and just use boring SQL queries for everything.

[`sqlc`](https://sqlc.dev/) is a very slick tool to generate code based on plain SQL queries using placeholders for arguments. It's not perfect...I had to do some gymnastics to make a few of the query types work.

But the fact that there is no runtime other than the standard SQL driver and some generated code outweighs its limitations IMO. There's no fancy ORM library to worry about mangling your queries, and the learning curve is basically just "how well do you know SQL".

- No support for serializable transactions
- The SQL driver is not as well maintained as PGX
- High learning curve to build complex queries, even if you know SQL well
- Relations system is not very powerful and ends up doing N+1 queries a lot of the time.
- Configuring the database with struct tags is errorprone, and there aren't great checks to make sure the struct tags actually match the schema.

- I can't find a good way to have dynamic ORDER BY expressions. So I literally have separate queries for ASC and DESC versions. It's not the end of the world, but it's very frustrating. There's an [issue to fix it](sqlc-dev/sqlc#2061), and some hacky workarounds using CASE statements, but it's not great.
- Making the filters play nice with `json_populate_recordset` is a bit of a pain. Switching to the `pgx` driver helped, since I think there was a bug in Bun's pgdriver.

We use Bun in a lot of places and for a lot of things today.

- It powers the `authz` database and all the migrations there
- It powers the migrations for the `message` database (but not the queries)
- It powers the `mls` database and all the queries in the `mlsstore`.

The priority right now is to remove it from the `mlsstore`. We will still use it for migrations (`sqlc` can read Bun migrations just fine).

This involves replacing the bun `pgdriver` with `pgx` (done in this PR) and replacing all the Bun ORM queries with `sqlc` queries. I have most of the queries written, but I'll split up the actual migration over several PRs. This can be done incrementally, but once the process is complete we can delete the Bun models.

We aren't using any of the fancy `sqlc` cloud features and have no plans to.

Ummmm. 😬. That was me.

* Migrate first store methods to sqlc

* Add InsertLog Query (#383)

* Add insertlog query

* validation service

* insert log

* revocation for removed members

* lint

* remove unnecessary log

* change test to use query from sqlc

* remove comments

* fix tests

* Add GetAddressLog SQL

* Merge branch '04-24-migrate_first_store_methods_to_sqlc' of github.com:xmtp/xmtp-node-go into 04-24-migrate_first_store_methods_to_sqlc

* Add InsertLog Query (#383)

* Add insertlog query

* validation service

* insert log

* revocation for removed members

* lint

* remove unnecessary log

* change test to use query from sqlc

* remove comments

* fix tests

* Add InsertLog Query (#383)

* Add insertlog query

* validation service

* insert log

* revocation for removed members

* lint

* remove unnecessary log

* change test to use query from sqlc

* remove comments

* fix tests

---------

Co-authored-by: Andrew Plaza <[email protected]>
  • Loading branch information
neekolas and insipx authored Apr 25, 2024
1 parent 711545a commit cb1822f
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 14 deletions.
6 changes: 2 additions & 4 deletions pkg/identity/api/v1/identity_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/xmtp/xmtp-node-go/pkg/mlsvalidate"
api "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type Service struct {
Expand Down Expand Up @@ -78,7 +76,7 @@ Start transaction (SERIALIZABLE isolation level)
End transaction
*/
func (s *Service) PublishIdentityUpdate(ctx context.Context, req *api.PublishIdentityUpdateRequest) (*api.PublishIdentityUpdateResponse, error) {
return s.store.PublishIdentityUpdate(ctx, req)
return s.store.PublishIdentityUpdate(ctx, req, s.validationService)
}

func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUpdatesRequest) (*api.GetIdentityUpdatesResponse, error) {
Expand All @@ -97,5 +95,5 @@ func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest)
for the address where revocation_sequence_id is lower or NULL
2. Return the value of the 'inbox_id' column
*/
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
return s.store.GetInboxIds(ctx, req)
}
22 changes: 21 additions & 1 deletion pkg/identity/api/v1/identity_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,27 @@ type mockedMLSValidationService struct {
}

func (m *mockedMLSValidationService) GetAssociationState(ctx context.Context, oldUpdates []*associations.IdentityUpdate, newUpdates []*associations.IdentityUpdate) (*mlsvalidate.AssociationStateResult, error) {
return nil, nil

member_map := make([]*associations.MemberMap, 0)
member_map = append(member_map, &associations.MemberMap{
Key: &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "key_address"}},
Value: &associations.Member{
Identifier: &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "ident"}},
AddedByEntity: &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "added_by_entity"}},
},
})

new_members := make([]*associations.MemberIdentifier, 0)

new_members = append(new_members, &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "0x01"}})
new_members = append(new_members, &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "0x02"}})
new_members = append(new_members, &associations.MemberIdentifier{Kind: &associations.MemberIdentifier_Address{Address: "0x03"}})

out := mlsvalidate.AssociationStateResult{
AssociationState: &associations.AssociationState{InboxId: "test_inbox", Members: member_map, RecoveryAddress: "recovery", SeenSignatures: [][]byte{[]byte("seen"), []byte("sig")}},
StateDiff: &associations.AssociationStateDiff{NewMembers: new_members, RemovedMembers: nil},
}
return &out, nil
}

func (m *mockedMLSValidationService) ValidateKeyPackages(ctx context.Context, keyPackages [][]byte) ([]mlsvalidate.IdentityValidationResult, error) {
Expand Down
4 changes: 0 additions & 4 deletions pkg/migrations/mls/20240411200242_init-identity.up.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
SET statement_timeout = 0;

--bun:split

CREATE TABLE inbox_log (
sequence_id BIGSERIAL PRIMARY KEY,
inbox_id TEXT NOT NULL,
Expand All @@ -10,11 +9,9 @@ CREATE TABLE inbox_log (
);

--bun:split

CREATE INDEX idx_inbox_log_inbox_id ON inbox_log(inbox_id);

--bun:split

CREATE TABLE address_log (
address TEXT NOT NULL,
inbox_id TEXT NOT NULL,
Expand All @@ -23,5 +20,4 @@ CREATE TABLE address_log (
);

--bun:split

CREATE INDEX idx_address_log_address_inbox_id ON address_log(address, inbox_id);
9 changes: 9 additions & 0 deletions pkg/mls/store/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"github.com/uptrace/bun"
)

type AddressLogEntry struct {
bun.BaseModel `bun:"table:address_log"`

Address string `bun:",notnull"`
InboxId string `bun:",notnull"`
AssociationSequenceId *uint64 `bun:","`
RevocationSequenceId *uint64 `bun:","`
}

type InboxLogEntry struct {
bun.BaseModel `bun:"table:inbox_log"`

Expand Down
39 changes: 39 additions & 0 deletions pkg/mls/store/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,31 @@ FROM inbox_log AS a
AND a.sequence_id > b.sequence_id
ORDER BY a.sequence_id ASC;

-- name: GetAddressLogs :many
SELECT a.address,
a.inbox_id,
a.association_sequence_id
FROM address_log a
INNER JOIN (
SELECT address,
MAX(association_sequence_id) AS max_association_sequence_id
FROM address_log
WHERE address = ANY (@addresses::text [])
AND revocation_sequence_id IS NULL
GROUP BY address
) b ON a.address = b.address
AND a.association_sequence_id = b.max_association_sequence_id;

-- name: InsertAddressLog :one
INSERT INTO address_log (
address,
inbox_id,
association_sequence_id,
revocation_sequence_id
)
VALUES ($1, $2, $3, $4)
RETURNING *;

-- name: InsertInboxLog :one
INSERT INTO inbox_log (
inbox_id,
Expand All @@ -24,6 +49,20 @@ INSERT INTO inbox_log (
VALUES ($1, $2, $3)
RETURNING sequence_id;

-- name: RevokeAddressFromLog :exec
UPDATE address_log
SET revocation_sequence_id = $1
WHERE (address, inbox_id, association_sequence_id) = (
SELECT address,
inbox_id,
MAX(association_sequence_id)
FROM address_log AS a
WHERE a.address = $2
AND a.inbox_id = $3
GROUP BY address,
inbox_id
);

-- name: CreateInstallation :exec
INSERT INTO installations (
id,
Expand Down
106 changes: 106 additions & 0 deletions pkg/mls/store/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 72 additions & 5 deletions pkg/mls/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/uptrace/bun/migrate"
migrations "github.com/xmtp/xmtp-node-go/pkg/migrations/mls"
queries "github.com/xmtp/xmtp-node-go/pkg/mls/store/queries"
"github.com/xmtp/xmtp-node-go/pkg/mlsvalidate"
identity "github.com/xmtp/xmtp-node-go/pkg/proto/identity/api/v1"
"github.com/xmtp/xmtp-node-go/pkg/proto/identity/associations"
mlsv1 "github.com/xmtp/xmtp-node-go/pkg/proto/mls/api/v1"
Expand All @@ -30,8 +31,9 @@ type Store struct {
}

type IdentityStore interface {
PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest) (*identity.PublishIdentityUpdateResponse, error)
PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identity.PublishIdentityUpdateResponse, error)
GetInboxLogs(ctx context.Context, req *identity.GetIdentityUpdatesRequest) (*identity.GetIdentityUpdatesResponse, error)
GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error)
}

type MlsStore interface {
Expand Down Expand Up @@ -67,7 +69,38 @@ func New(ctx context.Context, config Config) (*Store, error) {
return s, nil
}

func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest) (*identity.PublishIdentityUpdateResponse, error) {
func (s *Store) GetInboxIds(ctx context.Context, req *identity.GetInboxIdsRequest) (*identity.GetInboxIdsResponse, error) {

addresses := []string{}
for _, request := range req.Requests {
addresses = append(addresses, request.GetAddress())
}

addressLogEntries, err := s.queries.GetAddressLogs(ctx, addresses)
if err != nil {
return nil, err
}

out := make([]*identity.GetInboxIdsResponse_Response, len(addresses))

for index, address := range addresses {
resp := identity.GetInboxIdsResponse_Response{}
resp.Address = address

for _, log_entry := range addressLogEntries {
if log_entry.Address == address {
resp.InboxId = &log_entry.InboxID
}
}
out[index] = &resp
}

return &identity.GetInboxIdsResponse{
Responses: out,
}, nil
}

func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.PublishIdentityUpdateRequest, validationService mlsvalidate.MLSValidationService) (*identity.PublishIdentityUpdateResponse, error) {
new_update := req.GetIdentityUpdate()
if new_update == nil {
return nil, errors.New("IdentityUpdate is required")
Expand All @@ -93,23 +126,57 @@ func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.Publish
}
_ = append(updates, new_update)

// TODO: Validate the updates, and abort transaction if failed
state, err := validationService.GetAssociationState(ctx, updates, []*associations.IdentityUpdate{new_update})
if err != nil {
return err
}

s.log.Info("Got association state", zap.Any("state", state))
protoBytes, err := proto.Marshal(new_update)
if err != nil {
return err
}

_, err = txQueries.InsertInboxLog(ctx, queries.InsertInboxLogParams{
sequence_id, err := txQueries.InsertInboxLog(ctx, queries.InsertInboxLogParams{
InboxID: new_update.GetInboxId(),
ServerTimestampNs: nowNs(),
IdentityUpdateProto: protoBytes,
})

s.log.Info("Inserted inbox log", zap.Any("sequence_id", sequence_id))

if err != nil {
return err
}
// TODO: Insert or update the address_log table using sequence_id

for _, new_member := range state.StateDiff.NewMembers {
s.log.Info("New member", zap.Any("member", new_member))
if address, ok := new_member.Kind.(*associations.MemberIdentifier_Address); ok {
_, err = txQueries.InsertAddressLog(ctx, queries.InsertAddressLogParams{
Address: address.Address,
InboxID: state.AssociationState.InboxId,
AssociationSequenceID: sql.NullInt64{Valid: true, Int64: sequence_id},
RevocationSequenceID: sql.NullInt64{Valid: false},
})
if err != nil {
return err
}
}
}

for _, removed_member := range state.StateDiff.RemovedMembers {
s.log.Info("New member", zap.Any("member", removed_member))
if address, ok := removed_member.Kind.(*associations.MemberIdentifier_Address); ok {
err = txQueries.RevokeAddressFromLog(ctx, queries.RevokeAddressFromLogParams{
Address: address.Address,
InboxID: state.AssociationState.InboxId,
RevocationSequenceID: sql.NullInt64{Valid: true, Int64: sequence_id},
})
if err != nil {
return err
}
}
}

return nil
}); err != nil {
Expand Down
Loading

0 comments on commit cb1822f

Please sign in to comment.