Skip to content

Commit

Permalink
Add schema and define algorithm for updates
Browse files Browse the repository at this point in the history
  • Loading branch information
richardhuaaa committed Apr 12, 2024
1 parent 3fa8f93 commit 45989cc
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 4 deletions.
26 changes: 26 additions & 0 deletions pkg/identity/api/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,39 @@ func (s *Service) Close() {
}

func (s *Service) PublishIdentityUpdate(ctx context.Context, req *api.PublishIdentityUpdateRequest) (*api.PublishIdentityUpdateResponse, error) {
// Algorithm:
// Note - the inbox_log table has global ordering via a serial sequence_ID, and an index by inbox_ID
// 1. Append update to DB under inbox_id with commit_status UNCOMMITTED
// 2. ProcessLog(inbox_id):
// - Read the log for the inbox_id
// - Validate log sequentially
// - Update UNCOMMITTED rows to either VALIDATED or delete them based on the validation result
// - For each row that is VALIDATED:
// - Add it to the relevant address log.
// - Note: There may be races between multiple ProcessLog() calls on the same inbox, or across multiple inboxes.
// The address log can use a unique index on inbox_log_sequence_ID to prevent duplicate updates and establish ordering.
// - Process the address log and cache the XID into a third table (address_lookup_cache)
// - Note: To prevent new data overwriting old data, the address_lookup_cache stores the inbox_log_sequence_id, and we do
// an atomic update WHERE new_sequence_id > old_sequence_id
// - Update the row in the inbox_id table to COMMITTED
// 3. Return success from the API if the original identity update was COMMITTED, else return error
//
// If the server goes down in the middle of processing an update, subsequent ProcessLog() calls will pick up where the previous one left off.
// The client is expected to retry with the same payload, and the server is expected to deduplicate the update.

return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) GetIdentityUpdates(ctx context.Context, req *api.GetIdentityUpdatesRequest) (*api.GetIdentityUpdatesResponse, error) {
// Algorithm:
// 1. Query the relevant inbox_log tables, filtering to COMMITTED rows
// 2. Return the updates in the response
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}

func (s *Service) GetInboxIds(ctx context.Context, req *api.GetInboxIdsRequest) (*api.GetInboxIdsResponse, error) {
// Algorithm:
// 1. Query the address_lookup_cache for each address
// 2. Return the result
return nil, status.Errorf(codes.Unimplemented, "unimplemented")
}
8 changes: 6 additions & 2 deletions pkg/migrations/mls/20240411200242_init-identity.down.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ SET statement_timeout = 0;

--bun:split

SELECT 1
DROP TABLE IF EXISTS inbox_log;

--bun:split

SELECT 2
DROP TABLE IF EXISTS address_log;

--bun:split

DROP TABLE IF EXISTS address_lookup_cache;
29 changes: 27 additions & 2 deletions pkg/migrations/mls/20240411200242_init-identity.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,33 @@ SET statement_timeout = 0;

--bun:split

SELECT 1
CREATE TABLE inbox_log (
sequence_id BIGSERIAL PRIMARY KEY,
inbox_id TEXT NOT NULL,
commit_status SMALLINT NOT NULL,
server_timestamp_ns BIGINT NOT NULL,
identity_update_proto BYTEA NOT NULL
);

--bun:split

SELECT 2
CREATE INDEX idx_inbox_log_inbox_id_commit_status ON inbox_log(inbox_id, commit_status);

--bun:split

CREATE TABLE address_log (
inbox_log_sequence_id BIGINT PRIMARY KEY,
address TEXT NOT NULL,
identity_update_proto BYTEA NOT NULL
);

--bun:split

CREATE INDEX idx_address_log_address ON address_log(address);

--bun:split

CREATE TABLE address_lookup_cache (
address TEXT PRIMARY KEY,
inbox_id TEXT,
);

0 comments on commit 45989cc

Please sign in to comment.