Skip to content

Commit

Permalink
Use transaction, merge address_log and address_lookup, add log size l…
Browse files Browse the repository at this point in the history
…imits
  • Loading branch information
richardhuaaa committed Apr 12, 2024
1 parent a9a032b commit c9cd25c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 41 deletions.
54 changes: 27 additions & 27 deletions pkg/identity/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,42 +44,42 @@ func (s *Service) Close() {
/*
Algorithm:
Note - the inbox_log table has global ordering via a serial sequence_ID, and an index by inbox_ID
1. Append update to DB under inbox_id with commit_status UNCOMMITTED
2. ProcessLog(inbox_id):
- Read the log for the inbox_id
- Validate log sequentially
- Update UNCOMMITTED rows to either VALIDATED or delete them based on the validation result
- For each row that is VALIDATED:
- Add it to the relevant address log.
- Note: There may be races between multiple ProcessLog() calls on the same inbox,
or across multiple inboxes. The address log can use a unique index on
inbox_log_sequence_ID to prevent duplicate updates and establish ordering.
- Process the address log and cache the XID into a third table (address_lookup_cache)
- Note: To prevent new data overwriting old data, the address_lookup_cache stores the
inbox_log_sequence_id, and we do an atomic update WHERE new_sequence_id > old_sequence_id
- Update the row in the inbox_id table to COMMITTED
3. Return success from the API if the original identity update was COMMITTED, else return error
If the server goes down in the middle of processing an update, subsequent ProcessLog() calls will
pick up where the previous one left off.
The client is expected to retry with the same payload, and the server is expected to deduplicate the update.
Start transaction
1. Insert the update into the inbox_log table
2. Read the log for the inbox_id, ordering by sequence_id
3. If the log has more than 256 entries, abort the transaction.
3. Validate it sequentially. If failed, abort the transaction.
4. For each affected address:
a. Insert the update into the address_log table
-- Note: There may be races across multiple inboxes. The address_log can use
-- inbox_log_sequence_ID to establish ordering.
b. Read the log for the address, ordering by *inbox_log_sequence_id*
c. If the log has more than 256 entries, abort the transaction.
c. Process the address log (without signature validation) and compute the final inbox ID
d. Update the inserted entry with the result.
End transaction
*/
func (s *Service) PublishIdentityUpdate(ctx context.Context, req *api.PublishIdentityUpdateRequest) (*api.PublishIdentityUpdateResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUpdatesRequest) (*api.GetIdentityUpdatesResponse, error) {
// Algorithm:
// 1. Query the relevant inbox_log tables, filtering to COMMITTED rows
// 2. Return the updates in the response
/*
Algorithm for each request:
1. Query the inbox_log table for the inbox_id, ordering by sequence_id
2. Return all of the entries
*/
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest) (*api.GetInboxIdsResponse, error) {
// Algorithm:
// 1. Query the address_lookup_cache for each address
// 2. Return the result
/*
Algorithm for each request:
1. Query the address_log table for the latest sequence_id on the address
-- Note: NOT inbox_log_sequence_id
2. Return the value of the 'inbox_id' column
*/
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}
4 changes: 0 additions & 4 deletions pkg/migrations/mls/20240411200242_init-identity.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,3 @@ DROP TABLE IF EXISTS inbox_log;
--bun:split

DROP TABLE IF EXISTS address_log;

--bun:split

DROP TABLE IF EXISTS address_lookup_cache;
14 changes: 4 additions & 10 deletions pkg/migrations/mls/20240411200242_init-identity.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,24 @@ SET statement_timeout = 0;
CREATE TABLE inbox_log (
sequence_id BIGSERIAL PRIMARY KEY,
inbox_id TEXT NOT NULL,
commit_status SMALLINT NOT NULL,
server_timestamp_ns BIGINT NOT NULL,
identity_update_proto BYTEA NOT NULL
);

--bun:split

CREATE INDEX idx_inbox_log_inbox_id_commit_status ON inbox_log(inbox_id, commit_status);
CREATE INDEX idx_inbox_log_inbox_id ON inbox_log(inbox_id);

--bun:split

CREATE TABLE address_log (
inbox_log_sequence_id BIGINT PRIMARY KEY,
sequence_id BIGSERIAL PRIMARY KEY,
inbox_log_sequence_id BIGINT,
address TEXT NOT NULL,
inbox_id TEXT,
identity_update_proto BYTEA NOT NULL
);

--bun:split

CREATE INDEX idx_address_log_address ON address_log(address);

--bun:split

CREATE TABLE address_lookup_cache (
address TEXT PRIMARY KEY,
inbox_id TEXT,
);

0 comments on commit c9cd25c

Please sign in to comment.