From 6eeee82f0aa76b5a06a2c25de519c0021b83074b Mon Sep 17 00:00:00 2001 From: David Edey Date: Fri, 2 Aug 2024 17:32:32 +0100 Subject: [PATCH] example: More powerful non fungible ids query --- .../EntityAddress.cs | 3 + .../Services/EntityStateQuerier.cs | 77 ++---- .../NonFungibleIdsInResourcePageQuery.cs | 246 ++++++++++++++++++ .../Services/PageQueryTools/page_querying.md | 46 ++++ 4 files changed, 319 insertions(+), 53 deletions(-) create mode 100644 src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/NonFungibleIdsInResourcePageQuery.cs create mode 100644 src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/page_querying.md diff --git a/src/RadixDlt.NetworkGateway.Abstractions/EntityAddress.cs b/src/RadixDlt.NetworkGateway.Abstractions/EntityAddress.cs index 0d7a31edb..37e44154a 100644 --- a/src/RadixDlt.NetworkGateway.Abstractions/EntityAddress.cs +++ b/src/RadixDlt.NetworkGateway.Abstractions/EntityAddress.cs @@ -76,6 +76,9 @@ private EntityAddress(string address) _address = address; } + // See https://radixdlt.atlassian.net/wiki/spaces/S/pages/3045556302/REP-71+-+Address+Formats+and+Vanity+Addresses + public bool IsNonFungibleResource => _address.Contains("1ng") || _address.Contains("1nf") || _address.Contains("1n2") || _address.Contains("1nt"); + public bool IsInternal => _address.StartsWith("internal_"); public bool IsGlobal => !IsInternal; diff --git a/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/EntityStateQuerier.cs b/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/EntityStateQuerier.cs index cc0cc1172..b329c66e4 100644 --- a/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/EntityStateQuerier.cs +++ b/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/EntityStateQuerier.cs @@ -74,6 +74,7 @@ using RadixDlt.NetworkGateway.GatewayApi.Exceptions; using RadixDlt.NetworkGateway.GatewayApi.Services; using RadixDlt.NetworkGateway.PostgresIntegration.Models; +using RadixDlt.NetworkGateway.PostgresIntegration.Services.PageQueryTools; using System; using System.Collections.Generic; using System.Collections.Immutable; @@ -112,8 +113,6 @@ private record ValidatorCurrentStakeViewModel( private record RoyaltyVaultBalanceViewModel(long RoyaltyVaultEntityId, string Balance, long OwnerEntityId, long LastUpdatedAtStateVersion); - private record NonFungibleIdsViewModel(long Id, long FromStateVersion, string NonFungibleId); - private record struct ExplicitMetadataLookup(long EntityId, string MetadataKey); private readonly TokenAmount _tokenAmount100 = TokenAmount.FromDecimalString("100"); @@ -590,62 +589,34 @@ public EntityStateQuerier( int pageSize, CancellationToken token = default) { - var entity = await GetEntity(nonFungibleResourceAddress, ledgerState, token); - - var cd = new CommandDefinition( - commandText: @" -SELECT - d.id AS Id, - d.from_state_version AS FromStateVersion, - d.non_fungible_id AS NonFungibleId -FROM non_fungible_id_definition d -INNER JOIN LATERAL ( - SELECT * - FROM non_fungible_id_data_history - WHERE non_fungible_id_definition_id = d.id AND from_state_version <= @stateVersion - ORDER BY from_state_version DESC - LIMIT 1 - ) h ON TRUE -WHERE - d.non_fungible_resource_entity_id = @nonFungibleResourceEntityId - AND (d.from_state_version, d.id) <= (@cursorStateVersion, @cursorId) - AND d.from_state_version <= @stateVersion - AND h.is_deleted = false -ORDER BY d.from_state_version DESC, d.id DESC -LIMIT @limit -;", - parameters: new - { - nonFungibleResourceEntityId = entity.Id, - stateVersion = ledgerState.StateVersion, - cursorStateVersion = cursor?.StateVersionBoundary ?? long.MaxValue, - cursorId = cursor?.IdBoundary ?? long.MaxValue, - limit = pageSize + 1, - }, - cancellationToken: token); - - var entriesAndOneMore = (await _dapperWrapper.QueryAsync(_dbContext.Database.GetDbConnection(), cd)) - .ToList(); - - var nextCursor = entriesAndOneMore.Count == pageSize + 1 - ? new GatewayModel.IdBoundaryCoursor(entriesAndOneMore.Last().FromStateVersion, entriesAndOneMore.Last().Id).ToCursorString() - : null; - - var supplyHistory = await _dbContext.ResourceEntitySupplyHistory.FirstOrDefaultAsync(x => x.ResourceEntityId == entity.Id, token); - long totalCount = supplyHistory != null ? long.Parse(supplyHistory.TotalSupply.ToString()) : 0; + if (!nonFungibleResourceAddress.IsNonFungibleResource) + { + throw new InvalidEntityException(nonFungibleResourceAddress.ToString()); + } - var items = entriesAndOneMore - .Take(pageSize) - .Select(vm => vm.NonFungibleId) - .ToList(); + var pages = await NonFungibleIdsInResourcePageQuery.ReadPages( + _dbContext, + _dapperWrapper, + ledgerState, + new List { nonFungibleResourceAddress }, + new NonFungibleIdsInResourcePageQuery.PageParameters( + ExclusiveCursor: cursor, + IsAscending: false, + IncludeDeleted: true, + IncludeValue: false, + MaxPageSize: pageSize, + MaxDefinitionsToRead: 10000000), + token); + + if (!pages.TryGetValue(nonFungibleResourceAddress, out var page)) + { + throw new EntityNotFoundException(nonFungibleResourceAddress.ToString()); + } return new GatewayModel.StateNonFungibleIdsResponse( ledgerState: ledgerState, resourceAddress: nonFungibleResourceAddress, - nonFungibleIds: new GatewayModel.NonFungibleIdsCollection( - totalCount: totalCount, - nextCursor: nextCursor, - items: items)); + nonFungibleIds: page.ToNonFungibleIdsCollection()); } public async Task NonFungibleIdData( diff --git a/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/NonFungibleIdsInResourcePageQuery.cs b/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/NonFungibleIdsInResourcePageQuery.cs new file mode 100644 index 000000000..54a9172b2 --- /dev/null +++ b/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/NonFungibleIdsInResourcePageQuery.cs @@ -0,0 +1,246 @@ +using Dapper; +using Microsoft.EntityFrameworkCore; +using RadixDlt.NetworkGateway.Abstractions; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using GatewayModel = RadixDlt.NetworkGateway.GatewayApiSdk.Model; + +// ReSharper disable NotAccessedPositionalProperty.Global +namespace RadixDlt.NetworkGateway.PostgresIntegration.Services.PageQueryTools; + +internal static class NonFungibleIdsInResourcePageQuery +{ + internal readonly record struct PageParameters( + GatewayModel.IdBoundaryCoursor? ExclusiveCursor, + bool IsAscending, + bool IncludeDeleted, + bool IncludeValue, + int MaxPageSize, + int MaxDefinitionsToRead + ); + + // Raw result form the query - easiest to keep this relatively + // standardised across different PageQuerys, but you might need + // to change the type of Key, Value and the Totals + private readonly record struct QueryResultRow( + long EntityId, + string EntityAddress, + long TotalEntriesExcludingDeleted, + long TotalEntriesIncludingDeleted, + bool FilterOut, + long? DefinitionId, + string? Key, + long? KeyFirstSeenStateVersion, + byte[]? Value, + bool? IsLocked, + bool? IsDeleted, + long? LastUpdatedStateVersion, + long? NextExclusiveCursorStateVersion, + long? NextExclusiveCursorDefinitionId + ); + + // Query-specific results model - mapping QueryResultRow back out + internal readonly record struct PerEntityResult( + long EntityId, + EntityAddress NonFungibleEntityAddress, + GatewayModel.IdBoundaryCoursor? NextCursor, + long TotalEntriesGivenPagingParameters, + long TotalMinted, + long TotalSupply, + List PageItems + ) + { + internal GatewayModel.NonFungibleIdsCollection ToNonFungibleIdsCollection() + { + return new GatewayModel.NonFungibleIdsCollection( + totalCount: TotalEntriesGivenPagingParameters, + nextCursor: NextCursor?.ToCursorString(), + items: PageItems.Select(i => i.NonFungibleId).ToList()); + } + } + + internal readonly record struct PageItem( + long DefinitionId, + string NonFungibleId, + long KeyFirstSeenStateVersion, + byte[]? Data, + bool IsLocked, + bool IsDeleted, + long DataLastUpdatedStateVersion + ); + + internal static async Task> ReadPages( + ReadOnlyDbContext dbContext, + IDapperWrapper dapperWrapper, + GatewayModel.LedgerState ledgerState, + List nonFungibleResources, + PageParameters pageParameters, + CancellationToken token = default) + { + // See `page_querying.md` for details about how this query structure works + var queryParameters = new + { + rootEntityAddresses = nonFungibleResources, + useCursor = pageParameters.ExclusiveCursor is not null, + stateVersion = ledgerState.StateVersion, + exclusiveCursorStateVersion = pageParameters.ExclusiveCursor?.StateVersionBoundary ?? 0, + exclusiveCursorDefinitionId = pageParameters.ExclusiveCursor?.IdBoundary ?? 0, + pageLimit = pageParameters.MaxPageSize, + definitionReadLimit = pageParameters.MaxDefinitionsToRead, + }; + + var commandDefinition = new CommandDefinition( + commandText: $@" +WITH vars AS ( + SELECT + CAST(@rootEntityAddresses AS text[]) AS entity_addresses, + -- If use_cursor is false, the cursor is ignored, so just set it to (0, 0) + CAST(@useCursor AS bool) AS use_cursor, + -- This cursor is (from_state_version, definition_id) exclusive + ROW(CAST(@exclusiveCursorStateVersion AS bigint), CAST(@exclusiveCursorDefinitionId AS bigint)) AS start_cursor_exclusive, + CAST(@stateVersion AS bigint) AS current_state_version +), +definitions_with_cursor AS ( + SELECT + d.*, + (d.from_state_version, d.id) AS cursor + FROM non_fungible_id_definition d +), +entries_per_entity AS ( + SELECT + entities.id AS EntityId, + entities.address AS EntityAddress, + entity_totals.total_entries_excluding_deleted AS TotalEntriesExcludingDeleted, + entity_totals.total_entries_including_deleted AS TotalEntriesIncludingDeleted, + COALESCE(filter_out, TRUE) AS FilterOut, + CASE WHEN COALESCE(filter_out, TRUE) THEN NULL ELSE definition_id END AS DefinitionId, + CASE WHEN COALESCE(filter_out, TRUE) THEN NULL ELSE key END AS Key, + CASE WHEN COALESCE(filter_out, TRUE) THEN NULL ELSE key_first_seen_state_version END AS KeyFirstSeenStateVersion, + CASE WHEN COALESCE(filter_out, TRUE) THEN NULL ELSE value END AS Value, + CASE WHEN COALESCE(filter_out, TRUE) THEN NULL ELSE is_locked END AS IsLocked, + CASE WHEN COALESCE(filter_out, TRUE) THEN NULL ELSE is_deleted END AS IsDeleted, + CASE WHEN COALESCE(filter_out, TRUE) THEN NULL ELSE last_updated_state_version END AS LastUpdatedStateVersion, + next_cursor_exclusive.f1 AS NextExclusiveCursorStateVersion, + next_cursor_exclusive.f2 AS NextExclusiveCursorDefinitionId + FROM vars + INNER JOIN LATERAL ( + SELECT + UNNEST(vars.entity_addresses) AS address + ) addresses ON TRUE + INNER JOIN entities + ON e.address = addresses.address + AND e.from_state_version <= vars.current_state_version + -- In general, this can be replaced by some XXX_totals_history table, or removed if we don't have any relevant totals table + INNER JOIN LATERAL ( + SELECT + t.total_supply AS total_entries_excluding_deleted, + t.total_minted AS total_entries_including_deleted + FROM resource_entity_supply_history t + WHERE + t.resource_entity_id = entities.id + AND t.from_state_version <= vars.current_state_version + ORDER BY + t.from_state_version DESC + LIMIT 1 + ) entity_totals ON TRUE + LEFT JOIN LATERAL ( -- LEFT JOIN so we always return a row where we can join on the totals + SELECT + definitions.id as definition_id, + definitions.non_fungible_id, + definitions.key_first_seen_state_version, + definitions.cursor, + entries.*, + CASE WHEN + -- Add cursor to last row returned only + -- > EITHER because we have filled a page (row num = limit) + -- > OR because we have reached the last sub-query item (definitions.is_last_subquery_item) + -- + -- NOTE: The last row should be ignored if filter_out is TRUE - in which case it's just being returned for the cursor + (ROW_NUMBER() OVER (ORDER BY definitions.cursor {(pageParameters.IsAscending ? "ASC" : "DESC")})) = @pageLimit + OR definitions.is_last_subquery_item + THEN definitions.cursor ELSE NULL END AS next_cursor_exclusive + FROM ( + SELECT + d.id AS id, + d.non_fungible_id AS key, -- The key + d.from_state_version AS key_first_seen_state_version, + d.cursor, + (ROW_NUMBER() OVER (ORDER BY d.cursor {(pageParameters.IsAscending ? "ASC" : "DESC")})) = @definitionReadLimit AS is_last_subquery_item + FROM definitions_with_cursor d + WHERE + d.non_fungible_resource_entity_id = entities.id + AND ( + (NOT vars.use_cursor) OR + d.cursor {(pageParameters.IsAscending ? ">" : "<")} vars.start_cursor_exclusive + ) + ORDER BY + d.cursor {(pageParameters.IsAscending ? "ASC" : "DESC")} + LIMIT @definitionReadLimit + ) definitions + INNER JOIN LATERAL ( + SELECT + h.from_state_version AS last_updated_state_version, + {(pageParameters.IncludeValue ? "NULL" : "h.data")} AS value, + h.is_locked, + h.is_deleted, + {(pageParameters.IncludeDeleted ? "TRUE" : "h.is_deleted")} AS filter_out + FROM non_fungible_id_data_history h + WHERE + h.non_fungible_id_definition_id = definitions.id + AND h.from_state_version <= vars.current_state_version + ORDER BY + h.from_state_version DESC + LIMIT 1 + ) entries ON TRUE + WHERE + (NOT entries.filter_out) + OR definitions.is_last_subquery_item + ORDER BY + definitions.cursor {(pageParameters.IsAscending ? "ASC" : "DESC")} + LIMIT @pageLimit + ) entries_per_entity ON TRUE +) +SELECT * FROM entries_per_entity +;", + parameters: queryParameters, + cancellationToken: token); + + var results = await dapperWrapper.QueryAsync(dbContext.Database.GetDbConnection(), commandDefinition); + + // NOTE: In some other instances where we have sub-pages, we may need to find roots for sub-pages here + // and do a call to load them as a dictionary, before creating the data models, reading off the sub-page roots. + + return results + .GroupBy(r => r.EntityId) + .Select(g => + { + var rows = g.ToList(); + var finalRow = rows.Last(); + var nextCursor = finalRow.NextExclusiveCursorStateVersion.HasValue + ? new GatewayModel.IdBoundaryCoursor(finalRow.NextExclusiveCursorStateVersion, finalRow.NextExclusiveCursorDefinitionId) + : null; + return new PerEntityResult( + EntityId: finalRow.EntityId, + NonFungibleEntityAddress: (EntityAddress)finalRow.EntityAddress, + NextCursor: nextCursor, + TotalEntriesGivenPagingParameters: pageParameters.IncludeDeleted ? finalRow.TotalEntriesIncludingDeleted : finalRow.TotalEntriesExcludingDeleted, + TotalMinted: finalRow.TotalEntriesIncludingDeleted, + TotalSupply: finalRow.TotalEntriesExcludingDeleted, + PageItems: rows + .Where(f => !f.FilterOut) + .Select(row => new PageItem( + DefinitionId: row.DefinitionId!.Value, + NonFungibleId: row.Key!, + KeyFirstSeenStateVersion: row.KeyFirstSeenStateVersion!.Value, + Data: row.Value, // Will be null if !IncludeValue + IsLocked: row.IsLocked!.Value, + IsDeleted: row.IsDeleted!.Value, + DataLastUpdatedStateVersion: row.LastUpdatedStateVersion!.Value + )).ToList() + ); + }) + .ToDictionary(r => r.NonFungibleEntityAddress); + } +} diff --git a/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/page_querying.md b/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/page_querying.md new file mode 100644 index 000000000..fe7c719fb --- /dev/null +++ b/src/RadixDlt.NetworkGateway.PostgresIntegration/Services/PageQueryTools/page_querying.md @@ -0,0 +1,46 @@ +## Page Query Conventions + +Each file has a single method which takes parameters as follows and returns a `Task>`. + +```csharp + ReadOnlyDbContext dbContext, + IDapperWrapper dapperWrapper, + GatewayModel.LedgerState ledgerState, + List rootEntityAddresses, + PageParameters pageParameters, + CancellationToken token = default +``` + +The `PageParameters` should be a local `record struct` which should include: +* An optional `GatewayModel.IdBoundaryCoursor?` cursor +* A max page size +* A max definition read limit (to bound the work done by the query) +* ... `XYZ.PageParameters` for any internal pages + +The method should perform a structured query similar to `NonFungibleIdsInResourcePageQuery` to pull all the pages of data from the database in one round trip. + +It should combine this with calls to other Page Query tools for sub-collections; and then unify these together to construct the data model. + +How the query works: + +* It assumes a structure with two (or three) tables: + * There is a "XXX_definitions" table, covering all unique keys observed in a collection `entity_id`. + This should have an index on `(entity_id, from_state_version, definition_id)`. + * There is a `XXX_entry_history" table, covering the history of each definition over time. + * (OPTIONAL) there is some kind of totals or aggregate table, such as `XXX_totals_history`. +* In one query, it efficiently returns: + * Pages of entries for each entity (subject to some bound on work performed) + * A next cursor for each of these pages + * Relevant totals associated with that entity +* It supports options to: + * Have an optional cursor + * Go ASC or DESC + * Include the value or not + * Include deleted entries or not +* In the result set, there will be at least one row for every entity which existed at the given state version. +* The last returned row for each entity is special: + * It has the cursor for possible further pagination (or `NULL` if none such exists) + * It includes totals (if they are uncommented) + * It may have `FilterOut` set to true - in which case, that row should not be returned (but the cursor/totals can still be used). + +You will need to take a base query such as the one in `NonFungibleIdsInResourcePageQuery` and change the relevant table/field names for the particular case you're looking at.