Skip to content

Commit

Permalink
Improve adding to and revoking from address log (#190)
Browse files Browse the repository at this point in the history
## tl;dr

- Addresses comment on the [insertAddressLog](https://app.graphite.dev/github/pr/xmtp/xmtpd/175/Publish-IdentityUpdates-to-blockchain-and-read-back?utm_source=gt-slack-notif#comment-PRRC_kwDOI4BqqM5qhcHD) query from the last PR
- Adds more tests for address log operations

## AI Generated Summary

Updated the `InsertAddressLog` query to handle conflicts and added comprehensive tests for address log operations.

### What changed?

- Modified the `InsertAddressLog` SQL query to handle conflicts on `(address, inbox_id)`.
- Updated the conflict resolution logic to set `revocation_sequence_id` to NULL and update `association_sequence_id` when appropriate.
- Changed the return type of `InsertAddressLog` function to return the number of affected rows.
- Added new test file `queries_test.go` with tests for `InsertAddressLog` and `RevokeAddressLog` operations.
- Updated the `IdentityUpdateStorer` to log a warning when `InsertAddressLog` affects 0 rows.

### How to test?

1. Run the new tests in `queries_test.go`:
   ```
   go test ./pkg/db -v
   ```
2. Verify that all tests pass, especially those related to `InsertAddressLog` and `RevokeAddressLog`.
3. Test the `IdentityUpdateStorer` with various scenarios to ensure proper logging of warnings when `InsertAddressLog` affects 0 rows.

### Why make this change?

This change improves the handling of address log insertions by properly managing conflicts and providing more detailed information about the operation's success. The added tests ensure the reliability of these critical database operations, while the warning log in the `IdentityUpdateStorer` helps identify potential issues with address log insertions during runtime.
  • Loading branch information
neekolas committed Oct 8, 2024
1 parent b2523d5 commit 42f7d7d
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 19 deletions.
12 changes: 8 additions & 4 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,15 @@ FROM
address) b ON a.address = b.address
AND a.association_sequence_id = b.max_association_sequence_id;

-- name: InsertAddressLog :exec
-- name: InsertAddressLog :execrows
INSERT INTO address_log(address, inbox_id, association_sequence_id, revocation_sequence_id)
VALUES (@address, decode(@inbox_id, 'hex'), @association_sequence_id, @revocation_sequence_id)
ON CONFLICT
DO NOTHING;
VALUES (@address, decode(@inbox_id, 'hex'), @association_sequence_id, NULL)
ON CONFLICT (address, inbox_id)
DO UPDATE SET
revocation_sequence_id = NULL, association_sequence_id = @association_sequence_id
WHERE (address_log.revocation_sequence_id IS NULL
OR address_log.revocation_sequence_id < @association_sequence_id)
AND address_log.association_sequence_id < @association_sequence_id;

-- name: RevokeAddressFromLog :execrows
UPDATE
Expand Down
27 changes: 14 additions & 13 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

137 changes: 137 additions & 0 deletions pkg/db/queries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package db

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/testutils"
)

func getAddressLogState(
t *testing.T,
querier *queries.Queries,
address string,
inboxId string,
) *queries.GetAddressLogsRow {
addressLogs, err := querier.GetAddressLogs(context.Background(), []string{address})
require.NoError(t, err)

if len(addressLogs) == 0 {
return nil
}

addressLog := addressLogs[0]
require.Equal(t, addressLog.InboxID, inboxId)

return &addressLog
}

func TestInsertAddressLog(t *testing.T) {
ctx := context.Background()
db, _, cleanup := testutils.NewDB(t, ctx)
defer cleanup()

querier := queries.New(db)

address := testutils.RandomString(20)
inboxId := testutils.RandomInboxId()

_, err := querier.InsertAddressLog(
ctx,
queries.InsertAddressLogParams{
Address: address,
InboxID: inboxId,
AssociationSequenceID: NullInt64(1),
},
)
require.NoError(t, err)

addressLog := getAddressLogState(t, querier, address, inboxId)
require.NotNil(t, addressLog)
require.Equal(t, addressLog.AssociationSequenceID.Int64, int64(1))

// Now insert a new entry with a higher sequence id
_, err = querier.InsertAddressLog(
ctx,
queries.InsertAddressLogParams{
Address: address,
InboxID: inboxId,
AssociationSequenceID: NullInt64(2),
},
)
require.NoError(t, err)

addressLog = getAddressLogState(t, querier, address, inboxId)
require.NotNil(t, addressLog)
require.Equal(t, addressLog.AssociationSequenceID.Int64, int64(2))

// Try to set it back to 1. This should be a no-op
numRows, err := querier.InsertAddressLog(
ctx,
queries.InsertAddressLogParams{
Address: address,
InboxID: inboxId,
AssociationSequenceID: NullInt64(1),
},
)
require.NoError(t, err)
require.Equal(t, numRows, int64(0))

addressLog = getAddressLogState(t, querier, address, inboxId)
require.NotNil(t, addressLog)
require.Equal(t, addressLog.AssociationSequenceID.Int64, int64(2))
}

func TestRevokeAddressLog(t *testing.T) {
ctx := context.Background()
db, _, cleanup := testutils.NewDB(t, ctx)
defer cleanup()

querier := queries.New(db)

address := testutils.RandomString(20)
inboxId := testutils.RandomInboxId()

_, err := querier.InsertAddressLog(
ctx,
queries.InsertAddressLogParams{
Address: address,
InboxID: inboxId,
AssociationSequenceID: NullInt64(1),
},
)
require.NoError(t, err)

numRows, err := querier.RevokeAddressFromLog(
ctx,
queries.RevokeAddressFromLogParams{
Address: address,
InboxID: inboxId,
RevocationSequenceID: NullInt64(2),
},
)
require.NoError(t, err)
require.Equal(t, numRows, int64(1))

addressLog := getAddressLogState(t, querier, address, inboxId)
require.Nil(t, addressLog)

// Now try to associate it a second time

numRows, err = querier.InsertAddressLog(
ctx,
queries.InsertAddressLogParams{
Address: address,
InboxID: inboxId,
AssociationSequenceID: NullInt64(3),
},
)
require.NoError(t, err)
require.Equal(t, numRows, int64(1))

addressLog = getAddressLogState(t, querier, address, inboxId)
require.NotNil(t, addressLog)
require.Equal(t, addressLog.AssociationSequenceID.Int64, int64(3))
}
11 changes: 9 additions & 2 deletions pkg/indexer/storer/identityUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,25 @@ func (s *IdentityUpdateStorer) StoreLog(ctx context.Context, event types.Log) Lo
for _, new_member := range associationState.StateDiff.NewMembers {
s.logger.Info("New member", zap.Any("member", new_member))
if address, ok := new_member.Kind.(*associations.MemberIdentifier_Address); ok {
err = querier.InsertAddressLog(ctx, queries.InsertAddressLogParams{
numRows, err := querier.InsertAddressLog(ctx, queries.InsertAddressLogParams{
Address: address.Address,
InboxID: inboxId,
AssociationSequenceID: sql.NullInt64{
Valid: true,
Int64: int64(msgSent.SequenceId),
},
RevocationSequenceID: sql.NullInt64{Valid: false},
})
if err != nil {
return NewLogStorageError(err, true)
}
if numRows == 0 {
s.logger.Warn(
"Could not insert address log",
zap.String("address", address.Address),
zap.String("inbox_id", inboxId),
zap.Int("sequence_id", int(msgSent.SequenceId)),
)
}
}
}

Expand Down

0 comments on commit 42f7d7d

Please sign in to comment.