diff --git a/pkg/identity/api/v1/service.go b/pkg/identity/api/v1/service.go index 71104ff7..6523d8a7 100644 --- a/pkg/identity/api/v1/service.go +++ b/pkg/identity/api/v1/service.go @@ -44,42 +44,42 @@ func (s *Service) Close() { /* Algorithm: -Note - the inbox_log table has global ordering via a serial sequence_ID, and an index by inbox_ID - -1. Append update to DB under inbox_id with commit_status UNCOMMITTED -2. ProcessLog(inbox_id): - - Read the log for the inbox_id - - Validate log sequentially - - Update UNCOMMITTED rows to either VALIDATED or delete them based on the validation result - - For each row that is VALIDATED: - - Add it to the relevant address log. - - Note: There may be races between multiple ProcessLog() calls on the same inbox, - or across multiple inboxes. The address log can use a unique index on - inbox_log_sequence_ID to prevent duplicate updates and establish ordering. - - Process the address log and cache the XID into a third table (address_lookup_cache) - - Note: To prevent new data overwriting old data, the address_lookup_cache stores the - inbox_log_sequence_id, and we do an atomic update WHERE new_sequence_id > old_sequence_id - - Update the row in the inbox_id table to COMMITTED -3. Return success from the API if the original identity update was COMMITTED, else return error - -If the server goes down in the middle of processing an update, subsequent ProcessLog() calls will -pick up where the previous one left off. -The client is expected to retry with the same payload, and the server is expected to deduplicate the update. +Start transaction + + 1. Insert the update into the inbox_log table + 2. Read the log for the inbox_id, ordering by sequence_id + 3. If the log has more than 256 entries, abort the transaction. + 3. Validate it sequentially. If failed, abort the transaction. + 4. For each affected address: + a. Insert the update into the address_log table + -- Note: There may be races across multiple inboxes. The address_log can use + -- inbox_log_sequence_ID to establish ordering. + b. Read the log for the address, ordering by *inbox_log_sequence_id* + c. If the log has more than 256 entries, abort the transaction. + c. Process the address log (without signature validation) and compute the final inbox ID + d. Update the inserted entry with the result. + +End transaction */ func (s *Service) PublishIdentityUpdate(ctx context.Context, req *api.PublishIdentityUpdateRequest) (*api.PublishIdentityUpdateResponse, error) { return nil, status.Errorf(codes.Unimplemented, "unimplemented") } func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUpdatesRequest) (*api.GetIdentityUpdatesResponse, error) { - // Algorithm: - // 1. Query the relevant inbox_log tables, filtering to COMMITTED rows - // 2. Return the updates in the response + /* + Algorithm for each request: + 1. Query the inbox_log table for the inbox_id, ordering by sequence_id + 2. Return all of the entries + */ return nil, status.Errorf(codes.Unimplemented, "unimplemented") } func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest) (*api.GetInboxIdsResponse, error) { - // Algorithm: - // 1. Query the address_lookup_cache for each address - // 2. Return the result + /* + Algorithm for each request: + 1. Query the address_log table for the latest sequence_id on the address + -- Note: NOT inbox_log_sequence_id + 2. Return the value of the 'inbox_id' column + */ return nil, status.Errorf(codes.Unimplemented, "unimplemented") } diff --git a/pkg/migrations/mls/20240411200242_init-identity.down.sql b/pkg/migrations/mls/20240411200242_init-identity.down.sql index 46b211c4..6189ef52 100644 --- a/pkg/migrations/mls/20240411200242_init-identity.down.sql +++ b/pkg/migrations/mls/20240411200242_init-identity.down.sql @@ -7,7 +7,3 @@ DROP TABLE IF EXISTS inbox_log; --bun:split DROP TABLE IF EXISTS address_log; - ---bun:split - -DROP TABLE IF EXISTS address_lookup_cache; diff --git a/pkg/migrations/mls/20240411200242_init-identity.up.sql b/pkg/migrations/mls/20240411200242_init-identity.up.sql index 56b3ad74..d40246c4 100644 --- a/pkg/migrations/mls/20240411200242_init-identity.up.sql +++ b/pkg/migrations/mls/20240411200242_init-identity.up.sql @@ -5,30 +5,24 @@ SET statement_timeout = 0; CREATE TABLE inbox_log ( sequence_id BIGSERIAL PRIMARY KEY, inbox_id TEXT NOT NULL, - commit_status SMALLINT NOT NULL, server_timestamp_ns BIGINT NOT NULL, identity_update_proto BYTEA NOT NULL ); --bun:split -CREATE INDEX idx_inbox_log_inbox_id_commit_status ON inbox_log(inbox_id, commit_status); +CREATE INDEX idx_inbox_log_inbox_id ON inbox_log(inbox_id); --bun:split CREATE TABLE address_log ( - inbox_log_sequence_id BIGINT PRIMARY KEY, + sequence_id BIGSERIAL PRIMARY KEY, + inbox_log_sequence_id BIGINT, address TEXT NOT NULL, + inbox_id TEXT, identity_update_proto BYTEA NOT NULL ); --bun:split CREATE INDEX idx_address_log_address ON address_log(address); - ---bun:split - -CREATE TABLE address_lookup_cache ( - address TEXT PRIMARY KEY, - inbox_id TEXT, -);