From 45989cc477888b1f073a0cdce1d90f534a17095f Mon Sep 17 00:00:00 2001 From: Richard Hua Date: Thu, 11 Apr 2024 18:31:07 -0700 Subject: [PATCH] Add schema and define algorithm for updates --- pkg/identity/api/v1/service.go | 26 +++++++++++++++++ .../mls/20240411200242_init-identity.down.sql | 8 +++-- .../mls/20240411200242_init-identity.up.sql | 29 +++++++++++++++++-- 3 files changed, 59 insertions(+), 4 deletions(-) diff --git a/pkg/identity/api/v1/service.go b/pkg/identity/api/v1/service.go index 2c7c0d45..1a6888ba 100644 --- a/pkg/identity/api/v1/service.go +++ b/pkg/identity/api/v1/service.go @@ -42,13 +42,39 @@ func (s *Service) Close() { } func (s *Service) PublishIdentityUpdate(ctx context.Context, req *api.PublishIdentityUpdateRequest) (*api.PublishIdentityUpdateResponse, error) { + // Algorithm: + // Note - the inbox_log table has global ordering via a serial sequence_ID, and an index by inbox_ID + // 1. Append update to DB under inbox_id with commit_status UNCOMMITTED + // 2. ProcessLog(inbox_id): + // - Read the log for the inbox_id + // - Validate log sequentially + // - Update UNCOMMITTED rows to either VALIDATED or delete them based on the validation result + // - For each row that is VALIDATED: + // - Add it to the relevant address log. + // - Note: There may be races between multiple ProcessLog() calls on the same inbox, or across multiple inboxes. + // The address log can use a unique index on inbox_log_sequence_ID to prevent duplicate updates and establish ordering. + // - Process the address log and cache the XID into a third table (address_lookup_cache) + // - Note: To prevent new data overwriting old data, the address_lookup_cache stores the inbox_log_sequence_id, and we do + // an atomic update WHERE new_sequence_id > old_sequence_id + // - Update the row in the inbox_id table to COMMITTED + // 3. Return success from the API if the original identity update was COMMITTED, else return error + // + // If the server goes down in the middle of processing an update, subsequent ProcessLog() calls will pick up where the previous one left off. + // The client is expected to retry with the same payload, and the server is expected to deduplicate the update. + return nil, status.Errorf(codes.Unimplemented, "unimplemented") } func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUpdatesRequest) (*api.GetIdentityUpdatesResponse, error) { + // Algorithm: + // 1. Query the relevant inbox_log tables, filtering to COMMITTED rows + // 2. Return the updates in the response return nil, status.Errorf(codes.Unimplemented, "unimplemented") } func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest) (*api.GetInboxIdsResponse, error) { + // Algorithm: + // 1. Query the address_lookup_cache for each address + // 2. Return the result return nil, status.Errorf(codes.Unimplemented, "unimplemented") } diff --git a/pkg/migrations/mls/20240411200242_init-identity.down.sql b/pkg/migrations/mls/20240411200242_init-identity.down.sql index 87d60f3e..46b211c4 100644 --- a/pkg/migrations/mls/20240411200242_init-identity.down.sql +++ b/pkg/migrations/mls/20240411200242_init-identity.down.sql @@ -2,8 +2,12 @@ SET statement_timeout = 0; --bun:split -SELECT 1 +DROP TABLE IF EXISTS inbox_log; --bun:split -SELECT 2 +DROP TABLE IF EXISTS address_log; + +--bun:split + +DROP TABLE IF EXISTS address_lookup_cache; diff --git a/pkg/migrations/mls/20240411200242_init-identity.up.sql b/pkg/migrations/mls/20240411200242_init-identity.up.sql index 87d60f3e..56b3ad74 100644 --- a/pkg/migrations/mls/20240411200242_init-identity.up.sql +++ b/pkg/migrations/mls/20240411200242_init-identity.up.sql @@ -2,8 +2,33 @@ SET statement_timeout = 0; --bun:split -SELECT 1 +CREATE TABLE inbox_log ( + sequence_id BIGSERIAL PRIMARY KEY, + inbox_id TEXT NOT NULL, + commit_status SMALLINT NOT NULL, + server_timestamp_ns BIGINT NOT NULL, + identity_update_proto BYTEA NOT NULL +); --bun:split -SELECT 2 +CREATE INDEX idx_inbox_log_inbox_id_commit_status ON inbox_log(inbox_id, commit_status); + +--bun:split + +CREATE TABLE address_log ( + inbox_log_sequence_id BIGINT PRIMARY KEY, + address TEXT NOT NULL, + identity_update_proto BYTEA NOT NULL +); + +--bun:split + +CREATE INDEX idx_address_log_address ON address_log(address); + +--bun:split + +CREATE TABLE address_lookup_cache ( + address TEXT PRIMARY KEY, + inbox_id TEXT, +);