From e367205e5041774a0ecfb9a2d5256c4919bbf6fa Mon Sep 17 00:00:00 2001 From: Dominik Toton <166132265+dtscalac@users.noreply.github.com> Date: Thu, 3 Oct 2024 19:06:41 +0200 Subject: [PATCH 1/4] fix(cat-voices): align colors with designs (#940) --- .../voices_no_internet_connection_banner.dart | 19 ++++++++++++++----- .../indicators/voices_status_indicator.dart | 9 +++++++-- .../assets/colors/colors.xml | 2 ++ .../lib/generated/colors.gen.dart | 6 ++++++ .../theme_extensions/voices_color_scheme.dart | 6 ++++++ .../lib/src/themes/catalyst.dart | 2 ++ 6 files changed, 37 insertions(+), 7 deletions(-) diff --git a/catalyst_voices/lib/widgets/indicators/voices_no_internet_connection_banner.dart b/catalyst_voices/lib/widgets/indicators/voices_no_internet_connection_banner.dart index a00ff3589d..57a4599609 100644 --- a/catalyst_voices/lib/widgets/indicators/voices_no_internet_connection_banner.dart +++ b/catalyst_voices/lib/widgets/indicators/voices_no_internet_connection_banner.dart @@ -35,12 +35,13 @@ class NoInternetConnectionBanner extends StatelessWidget { return LayoutBuilder( builder: (context, constraints) { final shouldButtonDisplay = constraints.maxWidth > 744; + final foregroundColor = Theme.of(context).colorScheme.errorContainer; return Container( padding: const EdgeInsets.symmetric(horizontal: 16, vertical: 8), margin: const EdgeInsets.all(16), decoration: BoxDecoration( - color: Theme.of(context).colors.avatarsError, + color: Theme.of(context).colorScheme.error, borderRadius: BorderRadius.circular(8), ), child: Row( @@ -56,20 +57,26 @@ class NoInternetConnectionBanner extends StatelessWidget { children: [ CatalystSvgIcon.asset( VoicesAssets.icons.wifi.path, - color: Theme.of(context).colors.iconsForeground, + color: foregroundColor, ), const SizedBox(width: 8), Expanded( child: Text( context.l10n.noConnectionBannerTitle, - style: Theme.of(context).textTheme.titleLarge, + style: Theme.of(context) + .textTheme + .titleLarge + ?.copyWith(color: foregroundColor), ), ), ], ), Text( context.l10n.noConnectionBannerDescription, - style: Theme.of(context).textTheme.bodySmall, + style: Theme.of(context) + .textTheme + .bodySmall + ?.copyWith(color: foregroundColor), softWrap: true, ), ], @@ -81,7 +88,9 @@ class NoInternetConnectionBanner extends StatelessWidget { onTap: onRefresh, child: Text( context.l10n.noConnectionBannerRefreshButtonText, - style: Theme.of(context).textTheme.labelLarge, + style: Theme.of(context).textTheme.labelLarge?.copyWith( + color: Theme.of(context).colors.onErrorVariant, + ), ), ), ], diff --git a/catalyst_voices/lib/widgets/indicators/voices_status_indicator.dart b/catalyst_voices/lib/widgets/indicators/voices_status_indicator.dart index 23ab44f899..679678d9c0 100644 --- a/catalyst_voices/lib/widgets/indicators/voices_status_indicator.dart +++ b/catalyst_voices/lib/widgets/indicators/voices_status_indicator.dart @@ -79,6 +79,11 @@ class _StatusContainer extends StatelessWidget { final theme = Theme.of(context); final backgroundColor = switch (type) { + VoicesStatusIndicatorType.success => theme.colors.success, + VoicesStatusIndicatorType.error => theme.colorScheme.error, + }; + + final foregroundColor = switch (type) { VoicesStatusIndicatorType.success => theme.colors.successContainer, VoicesStatusIndicatorType.error => theme.colors.errorContainer, }; @@ -92,11 +97,11 @@ class _StatusContainer extends StatelessWidget { alignment: Alignment.center, child: DefaultTextStyle( style: (theme.textTheme.titleLarge ?? const TextStyle()) - .copyWith(color: theme.colors.textPrimary), + .copyWith(color: foregroundColor), child: IconTheme( data: IconThemeData( size: 24, - color: theme.colors.textPrimary, + color: foregroundColor, ), child: child, ), diff --git a/catalyst_voices/packages/catalyst_voices_assets/assets/colors/colors.xml b/catalyst_voices/packages/catalyst_voices_assets/assets/colors/colors.xml index 080a0eb979..07ce203719 100644 --- a/catalyst_voices/packages/catalyst_voices_assets/assets/colors/colors.xml +++ b/catalyst_voices/packages/catalyst_voices_assets/assets/colors/colors.xml @@ -20,6 +20,7 @@ #61BFC8D9 #CC0000 #FFFFFF + #FFFFFF #FFD1D1 #700000 #218230 @@ -87,6 +88,7 @@ #364463 #FF9999 #380000 + #FFFFFF #AD0000 #FFD1D1 #BAEDC2 diff --git a/catalyst_voices/packages/catalyst_voices_assets/lib/generated/colors.gen.dart b/catalyst_voices/packages/catalyst_voices_assets/lib/generated/colors.gen.dart index 4b8b0c1d16..cdef49fdf0 100644 --- a/catalyst_voices/packages/catalyst_voices_assets/lib/generated/colors.gen.dart +++ b/catalyst_voices/packages/catalyst_voices_assets/lib/generated/colors.gen.dart @@ -82,6 +82,9 @@ class VoicesColors { /// Color: #FFD1D1 static const Color darkOnErrorContainer = Color(0xFFFFD1D1); + /// Color: #FFFFFF + static const Color darkOnErrorVariant = Color(0xFFFFFFFF); + /// Color: #0C288D static const Color darkOnPrimary = Color(0xFF0C288D); @@ -284,6 +287,9 @@ class VoicesColors { /// Color: #700000 static const Color lightOnErrorContainer = Color(0xFF700000); + /// Color: #FFFFFF + static const Color lightOnErrorVariant = Color(0xFFFFFFFF); + /// Color: #FFFFFF static const Color lightOnPrimary = Color(0xFFFFFFFF); diff --git a/catalyst_voices/packages/catalyst_voices_brands/lib/src/theme_extensions/voices_color_scheme.dart b/catalyst_voices/packages/catalyst_voices_brands/lib/src/theme_extensions/voices_color_scheme.dart index f4b3bb7339..ffb7218eba 100644 --- a/catalyst_voices/packages/catalyst_voices_brands/lib/src/theme_extensions/voices_color_scheme.dart +++ b/catalyst_voices/packages/catalyst_voices_brands/lib/src/theme_extensions/voices_color_scheme.dart @@ -62,6 +62,7 @@ class VoicesColorScheme extends ThemeExtension { final Color? primary98; final Color? primaryContainer; final Color? onPrimaryContainer; + final Color? onErrorVariant; final Color? errorContainer; final Color? onErrorContainer; @@ -121,6 +122,7 @@ class VoicesColorScheme extends ThemeExtension { required this.primary98, required this.primaryContainer, required this.onPrimaryContainer, + required this.onErrorVariant, required this.errorContainer, required this.onErrorContainer, }); @@ -182,6 +184,7 @@ class VoicesColorScheme extends ThemeExtension { this.primary98, this.primaryContainer, this.onPrimaryContainer, + this.onErrorVariant, this.errorContainer, this.onErrorContainer, }); @@ -243,6 +246,7 @@ class VoicesColorScheme extends ThemeExtension { Color? primary98, Color? primaryContainer, Color? onPrimaryContainer, + Color? onErrorVariant, Color? errorContainer, Color? onErrorContainer, }) { @@ -314,6 +318,7 @@ class VoicesColorScheme extends ThemeExtension { primary98: primary98 ?? this.primary98, primaryContainer: primaryContainer ?? this.primaryContainer, onPrimaryContainer: onPrimaryContainer ?? this.onPrimaryContainer, + onErrorVariant: onErrorVariant ?? this.onErrorVariant, errorContainer: errorContainer ?? this.errorContainer, onErrorContainer: onErrorContainer ?? this.onErrorContainer, ); @@ -447,6 +452,7 @@ class VoicesColorScheme extends ThemeExtension { primaryContainer: Color.lerp(primaryContainer, other.primaryContainer, t), onPrimaryContainer: Color.lerp(onPrimaryContainer, other.onPrimaryContainer, t), + onErrorVariant: Color.lerp(onErrorVariant, other.onErrorVariant, t), errorContainer: Color.lerp(errorContainer, other.errorContainer, t), onErrorContainer: Color.lerp(onErrorContainer, other.onErrorContainer, t), ); diff --git a/catalyst_voices/packages/catalyst_voices_brands/lib/src/themes/catalyst.dart b/catalyst_voices/packages/catalyst_voices_brands/lib/src/themes/catalyst.dart index b95e5f2106..63ddbc5408 100644 --- a/catalyst_voices/packages/catalyst_voices_brands/lib/src/themes/catalyst.dart +++ b/catalyst_voices/packages/catalyst_voices_brands/lib/src/themes/catalyst.dart @@ -81,6 +81,7 @@ const VoicesColorScheme darkVoicesColorScheme = VoicesColorScheme( primary98: VoicesColors.darkPrimary98, primaryContainer: VoicesColors.darkPrimaryContainer, onPrimaryContainer: VoicesColors.darkOnPrimaryContainer, + onErrorVariant: VoicesColors.darkOnErrorVariant, errorContainer: VoicesColors.darkErrorContainer, onErrorContainer: VoicesColors.darkOnErrorContainer, ); @@ -162,6 +163,7 @@ const VoicesColorScheme lightVoicesColorScheme = VoicesColorScheme( primary98: VoicesColors.lightPrimary98, primaryContainer: VoicesColors.lightPrimaryContainer, onPrimaryContainer: VoicesColors.lightOnPrimaryContainer, + onErrorVariant: VoicesColors.lightOnErrorVariant, errorContainer: VoicesColors.lightErrorContainer, onErrorContainer: VoicesColors.lightOnErrorContainer, ); From 4da289de58e468f9ba52c35628ce1e026db5803f Mon Sep 17 00:00:00 2001 From: Dominik Toton <166132265+dtscalac@users.noreply.github.com> Date: Thu, 3 Oct 2024 19:21:03 +0200 Subject: [PATCH 2/4] fix(cat-voices): migrate apple-mobile-web-app-capable to mobile-web-app-capable (#934) * refactor(cat-voices): migrate apple-mobile-web-app-capable to mobile-web-app-capable * chore: reformat --- .../packages/catalyst_voices_assets/example/web/index.html | 2 +- catalyst_voices/uikit_example/web/index.html | 2 +- catalyst_voices/web/index.html | 2 +- .../catalyst_cardano/catalyst_cardano/example/web/index.html | 2 +- utilities/catalyst_voices_remote_widgets/example/web/index.html | 2 +- utilities/poc_local_storage/web/index.html | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/catalyst_voices/packages/catalyst_voices_assets/example/web/index.html b/catalyst_voices/packages/catalyst_voices_assets/example/web/index.html index 1aa025dd68..29b58086ba 100644 --- a/catalyst_voices/packages/catalyst_voices_assets/example/web/index.html +++ b/catalyst_voices/packages/catalyst_voices_assets/example/web/index.html @@ -21,7 +21,7 @@ - + diff --git a/catalyst_voices/uikit_example/web/index.html b/catalyst_voices/uikit_example/web/index.html index 818d16800e..35b8895ff1 100644 --- a/catalyst_voices/uikit_example/web/index.html +++ b/catalyst_voices/uikit_example/web/index.html @@ -21,7 +21,7 @@ - + diff --git a/catalyst_voices/web/index.html b/catalyst_voices/web/index.html index cc5dc45b29..bf3dd0ddfd 100644 --- a/catalyst_voices/web/index.html +++ b/catalyst_voices/web/index.html @@ -22,7 +22,7 @@ - + diff --git a/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/example/web/index.html b/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/example/web/index.html index 4df009600f..62e096fcfe 100644 --- a/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/example/web/index.html +++ b/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/example/web/index.html @@ -22,7 +22,7 @@ - + diff --git a/utilities/catalyst_voices_remote_widgets/example/web/index.html b/utilities/catalyst_voices_remote_widgets/example/web/index.html index 1aa025dd68..29b58086ba 100644 --- a/utilities/catalyst_voices_remote_widgets/example/web/index.html +++ b/utilities/catalyst_voices_remote_widgets/example/web/index.html @@ -21,7 +21,7 @@ - + diff --git a/utilities/poc_local_storage/web/index.html b/utilities/poc_local_storage/web/index.html index 992d45d70b..692b6d6bca 100644 --- a/utilities/poc_local_storage/web/index.html +++ b/utilities/poc_local_storage/web/index.html @@ -21,7 +21,7 @@ - + From 399731d94c33feb58725ea08ce170b7ac4d25fd6 Mon Sep 17 00:00:00 2001 From: Joshua Gilman Date: Thu, 3 Oct 2024 16:11:30 -0700 Subject: [PATCH 3/4] ci(cat-voices): adds deployment configuration (#943) --- .github/workflows/ci.yml | 3 ++- blueprint.cue | 13 +++++++++++++ catalyst_voices/blueprint.cue | 17 ++++++++++++++++- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 81383e5e85..8459730cf3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,7 @@ name: CI on: push: branches: [main] + tags: ['*'] pull_request: permissions: @@ -15,4 +16,4 @@ jobs: ci: uses: input-output-hk/catalyst-forge/.github/workflows/ci.yml@ci/v1.1.0 with: - forge_version: 0.2.0 \ No newline at end of file + forge_version: 0.2.1 \ No newline at end of file diff --git a/blueprint.cue b/blueprint.cue index 193d3916f6..5caa8a9be6 100644 --- a/blueprint.cue +++ b/blueprint.cue @@ -24,6 +24,11 @@ global: { path: "global/ci/docker" } + git: credentials: { + provider: "aws" + path: "global/ci/deploy" + } + earthly: { credentials: { provider: "aws" @@ -48,4 +53,12 @@ global: { strategy: "commit" } } + deployment: { + registry: ci.providers.aws.registry + repo: { + url: "https://github.com/input-output-hk/catalyst-world" + ref: "master" + } + root: "k8s" + } } diff --git a/catalyst_voices/blueprint.cue b/catalyst_voices/blueprint.cue index c76d4145ee..68b085fbee 100644 --- a/catalyst_voices/blueprint.cue +++ b/catalyst_voices/blueprint.cue @@ -1,2 +1,17 @@ version: "1.0.0" -project: name: "voices" +project: { + name: "voices" + deployment: { + environment: "dev" + modules: main: { + container: "voices-deployment" + version: "0.1.1" + values: { + environment: name: "dev" + frontend: image: { + tag: _ @env(name="GIT_IMAGE_TAG",type="string") + } + } + } + } +} From ed8dc74abe69d32c0f31d3d3862cd27c894dd8a0 Mon Sep 17 00:00:00 2001 From: Steven Johnson Date: Fri, 4 Oct 2024 10:14:45 +0700 Subject: [PATCH 4/4] feat(cat-gateway): Sync indexing and recovery (#917) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(docs): Fix up docs issues * fix(backend): Huge refactor to prep for scylladb config management * fix(backend): Clean up logging a little, and add build info logs as required for production. * Refactor and setup cassandra config/session * feat(backend): Index DB schema setup seems to work * WIP * fix(rust): Format fixes * fix(rust): Build fixes * fix(rust): Adjust index DB so we can index without querying, and can optimize on first detected spend. * fix(rust): add more docs * fix(rust): basic new follower integration * fix(rust): wip * fix(ci): Bump rust compiler version to match CI * ci(backend): Bump rust version to match CI * fix(backend): Fix code format and lints * feat(backend): simple new block indexer just to test the logic works * feat(gateway): Simple indexing with cassandra seems to work * refactor(backend): Remove lazy and once_cell in favor of new standard library replacements * fix(backend): WIP indexing for stake addresses and unstaked ada * fix(backend): indexing WIP * fix(backend): Add support for log control with env vars, default to mainnet, adjust `justfile` to properly select preprod and also refresh git dependencies. * feat(backend): Make local test scylla db run with 4 nodes, not 1 * fix(backend-lib): Add stop for cassandra db cluster * refactor(backend-lib): Remove c509-certificate because its moved to catalyst-libs * fix(backend): Remove dependencies from Workspace, and move into project * fix(backend): Use temporary cat-ci branch for rust builders * fix(backend): Remove obsolete common crates subdirectory * fix(backend): Don't use pre-packaged mithril snapshots in integration tests * fix(backend): Fix code so it builds with latest chain follower code. Also eliminates redundant logic now incorporated into chain follower. * fix(backend): Fix broken reference to catalyst libs * ci(ci): Bump all earthfiles to latest WIP cat-ci branch * fix(frontend-pkg): Ignore .dart_tool directory in frontend files checking markdown * fix(ci): Fix spelling * fix(spelling): Add more project words and properly sort list * fix(backend): Sync rust configs and add target to make it easier in future * fix(backend): Enable all features of Scylla for now. * fix(frontend-pkg): Fix markdown table having too many columns * ci(spelling): Fix spelling issues * fix(docs): Bump docs to latest WIP cat-ci version * feat(gateway): Add low resource scylla db instance for local testing * feat(gateway): Add and update developer convenience functions for backend * fix(backend): Fix code format * fix(backend): Fix spelling issues in CQL files * fix(spelling): Remove duplicates from the project words dictionary * fix(backend): Get the backend building properly with earthly. * feat(backend): remove obsoleted postgres logic for chain indexing * revert(event-db): Revert extension changes to sql files after fixing sqlfluff version * fix(frontend): Regenerate the dart api interface file, and add doing that to the pre-push just command * fix(backend): temporarily disable API tests * fix(backend): Also temporarily stop workflow consuming test reports that are disabled * fix(ci): Try and stop coveralls running for api-tests * ci(general): Replace temp CI branch with tagged release * feat: Add Handler for Permissionless Auth (#825) * docs(cips): Add Formal Defintion of auth token * fix(docs): Fix comments in cddl file * fix(docs): sig size * fix(docs): Rename CDDL for the auth token * docs(docs): Add auth-header documentation * docs(docs): Fix markdown line length error * docs(general): Fix spelling * fix(backend-lib): Bump to catalyst-libs tagged version * fix(backend): stub out obsolete code (to be removed in follow up PR). * fix(backend-lib): code format * fix(backend): remove unused crate dependencies * feat: auth token (#723) * feat(auth token encode and decode): permissionless auth * feat(auth token encode and decode): permissionless auth * feat(auth token encode and decode): permissionless auth * feat(auth token encode and decode): permissionless auth * feat(auth token encode and decode): permissionless auth * iron out tests * iron out tests * refactor(auth token encode and decode): ed25519 Signature cbor fields Sig over the preceding two fields - sig(cbor(kid), cbor(ulid)) * refactor(auth token encode and decode): ed25519 Signature cbor fields Sig over the preceding two fields - sig(cbor(kid), cbor(ulid)) * feat(cat security scheme): open api * feat(cat security scheme): open api * feat(mock cert state): given kid from bearer return pub key * feat(auth token): cache TTL * feat(auth token): cache TTL * feat(auth token): cache TT * ci(spell check): fix * ci(spell check): fix * ci(spell check): fix * refactor(clippy): housekeeping tidy * refactor(clippy): housekeeping tidy * refactor(clippy): housekeeping tidy * refactor(clippy): housekeeping tidy * fix(backend): Re-enable dependent crates used by this code * fix(backend): clippy lints * fix(backend): spelling --------- Co-authored-by: Steven Johnson Co-authored-by: Steven Johnson * feat: Update GET staked_ada endpoint to fetch from ScyllaDB (#728) * feat: get staked ada from scylladb * chore: revert justfile changes * chore: filter TXOs in rust instead of filtering in ScyllaDB query * fix(backend): spelling * fix(backend): Eliminate lint errors from Derived function * fix(backend): code format * fix(backend): Udate autogenerated dart code * chore(cat-voices): fix tests --------- Co-authored-by: Steven Johnson Co-authored-by: Steven Johnson Co-authored-by: Dominik Toton * feat: DB Indexing for CIP-36 registrations (#788) * feat: add schema for cip-36 registration tables * feat: index cip-36 by stake address * feat: index cip-36 registrations by vote key * fix: use TxiInserParams::new when adding txi data * fix: remove unused cfg attributes * fix: refactor Cip36RegistrationInsertQuery::new * fix(backend): Refactor queries and add multiple tables for cip36 registration indexes * fix(backend): Cip36 Primary key is stake key. Stake Key N->1 Vote Key * fix(backend): code format --------- Co-authored-by: Steven Johnson Co-authored-by: Steven Johnson * docs(general): Cleanup project dictionary * docs(spelling): Fix spelling * fix(backend): remove obsolete clippy lint cfg * docs(backend): Improve field documentation so its not ambiguous. * docs(backend): Fix comment * docs(backend): Improve comment * fix(backend): Vote Key index logic, and update comments * fix(backend): Earthfile needs to be executed from root of repo, to properly pick up secrets * fix(backend): make generic saturating value converter and use it instead of type specific ones * test(cat-gateway): Add tests for float conversion and better docs about functions limitations. * fix(cat-gateway): Developer lints in release mode, and also refer to correct local release binary * fix(cat-gateway): CIP36 index schema error * fix(cat-gateway): Cip36 indexing working, improve bad cassandra query reporting. * refactor(cat-gateway): Make settings a sub-module * refactor(cat-gateway): Break up Envvar handling into multiple files * refactor(cat-gateway): Fix code format * feat(cat-gateway): Add chain-sync downloader options to cat-gateway env vars * test(cat-gateway): Make debug logs visible in local testing * fix(cat-gateway): Minimum timeout for mithril downloads * test(cat-gateway): Silence gratuitous debug log in cassandra queries * refactor(cat-gateway): Decrease verboseness of the string env var logging code * fix(general): Bump alpine version and pin to fix missing upstream containers * ci(general): Bump cat-ci to v3.2.07 * ci(general): fix docker in docker container image * feat(cat-gateway): Autogenerate the cassandra schema version and add schema sync progress table * fix(cat-gateway): Bump rust library dependency versions * fix(cat-gateway): Make cassandra namespace compliant. * refactor(cat-gateway): rename utilities to utils * refactor(cat-gateway): use ensure! instead of if/bail * ci(general): Bump cat-ci version to latest * fix(flutter): Again, try and see if caching is working ok for flutter builds with cat-ci * ci(general): Bump CI to latest version to try and fix docs build issues * docs(cat-gateway): Better document the generate_cql_schema_version function * fix(cat-gateway): Use a string array reference rather than a vec of strings for strings to uuid generation * refactor(cat-gateway): fix code format * fix(cat-gateway): Bump ci version and update rust earthfile to check for cache breaks * test(cat-gateway): Add parameters better suited to syncing mainnet * feat(cat-gateway): Add indexing insert queries for blockchain sync state * fix(cat-gateway): Bump to cat-ci v3.2.13 * feat(cat-gateway): Sync status indexing and reading now working. * feat(cat-gateway): Add recoverable sync logic * fix(cat-gateway): Update CI to fix issue with SyncConfig * fix(cat-gateway): spelling * fix(cat-gateway): Adjust the chain indexing parameters and defaults * fix(cat-gateway): Fix the logic at the end of indexing the immutable chain. --------- Co-authored-by: cong-or <60357579+cong-or@users.noreply.github.com> Co-authored-by: Felipe Rosa Co-authored-by: Dominik Toton Co-authored-by: JoaquĆ­n Rosales --- .config/dictionaries/project.dic | 1 + Earthfile | 6 +- catalyst-gateway/Earthfile | 63 +- catalyst-gateway/Justfile | 4 +- catalyst-gateway/bin/src/cardano/mod.rs | 690 +++++++----------- .../db/index/queries/cql/get_sync_status.cql | 7 + .../index/queries/cql/insert_sync_status.cql | 12 + .../db/index/queries/cql/update_txo_spent.cql | 9 +- .../bin/src/db/index/queries/mod.rs | 33 +- .../src/db/index/queries/sync_status/get.rs | 193 +++++ .../src/db/index/queries/sync_status/mod.rs | 4 + .../db/index/queries/sync_status/update.rs | 122 ++++ .../src/db/index/schema/cql/sync_status.cql | 2 +- .../bin/src/db/index/schema/mod.rs | 2 +- catalyst-gateway/bin/src/db/index/session.rs | 21 +- .../src/service/api/cardano/staked_ada_get.rs | 3 + .../bin/src/settings/chain_follower.rs | 20 + catalyst-gateway/event-db/Earthfile | 2 +- catalyst-gateway/tests/Earthfile | 2 +- catalyst-gateway/tests/api_tests/Earthfile | 2 +- catalyst_voices/Earthfile | 2 +- catalyst_voices/uikit_example/Earthfile | 2 +- .../wallet-automation/Earthfile | 4 +- docs/Earthfile | 2 +- utilities/local-scylla/justfile | 2 +- 25 files changed, 749 insertions(+), 461 deletions(-) create mode 100644 catalyst-gateway/bin/src/db/index/queries/cql/get_sync_status.cql create mode 100644 catalyst-gateway/bin/src/db/index/queries/cql/insert_sync_status.cql create mode 100644 catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs create mode 100644 catalyst-gateway/bin/src/db/index/queries/sync_status/mod.rs create mode 100644 catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs diff --git a/.config/dictionaries/project.dic b/.config/dictionaries/project.dic index f33072e0db..bfba1b49d9 100644 --- a/.config/dictionaries/project.dic +++ b/.config/dictionaries/project.dic @@ -239,6 +239,7 @@ Subkey submiting subosito svgs +syncable SYSROOT tablestats tacho diff --git a/Earthfile b/Earthfile index b41357596e..c3942962dd 100644 --- a/Earthfile +++ b/Earthfile @@ -1,8 +1,8 @@ VERSION 0.8 -IMPORT github.com/input-output-hk/catalyst-ci/earthly/mdlint:v3.2.10 AS mdlint-ci -IMPORT github.com/input-output-hk/catalyst-ci/earthly/cspell:v3.2.10 AS cspell-ci -IMPORT github.com/input-output-hk/catalyst-ci/earthly/postgresql:v3.2.10 AS postgresql-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/mdlint:v3.2.14 AS mdlint-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/cspell:v3.2.14 AS cspell-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/postgresql:v3.2.14 AS postgresql-ci FROM debian:stable-slim diff --git a/catalyst-gateway/Earthfile b/catalyst-gateway/Earthfile index 16896b37f9..ec3861ec4d 100644 --- a/catalyst-gateway/Earthfile +++ b/catalyst-gateway/Earthfile @@ -1,19 +1,42 @@ VERSION 0.8 -IMPORT github.com/input-output-hk/catalyst-ci/earthly/rust:v3.2.10 AS rust-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/rust:v3.2.14 AS rust-ci #cspell: words rustfmt toolsets USERARCH stdcfgs +# COPY_SRC: +# Copies the source code into the builder. +COPY_SRC: + FUNCTION + + COPY --keep-ts --dir \ + .cargo .config Cargo.toml clippy.toml deny.toml rustfmt.toml \ + bin \ + . + +# builder : Set up our target toolchains, and copy our files. +builder: + DO rust-ci+SETUP + # sync-cfg: Synchronize local config with CI version. # Must be run by the developer manually. sync-cfg: + FROM +builder DO rust-ci+SYNC_STD_CFG -# builder : Set up our target toolchains, and copy our files. -builder: - DO rust-ci+SETUP +builder-src: + FROM +builder + + # Don't build the src in the home directory itself, because it contains stuff. + WORKDIR $HOME/build + RUN rm -rf * - COPY --dir .cargo .config Cargo.* clippy.toml deny.toml rustfmt.toml bin . + # Cached copy of the source we build. + DO +COPY_SRC + + # Generate Checksums of the source + DO rust-ci+FINGERPRINT_SRC + SAVE ARTIFACT ../src_fingerprint.txt ## ----------------------------------------------------------------------------- ## @@ -24,7 +47,7 @@ builder: # check : Run check using the most efficient host tooling # CI Automated Entry point. check: - FROM +builder + FROM +builder-src DO rust-ci+EXECUTE --cmd="/scripts/std_checks.py" @@ -36,12 +59,12 @@ all-hosts-check: # build : Build the catalyst-gateway service build: # Build the service - FROM +builder + FROM +builder-src DO rust-ci+EXECUTE \ --cmd="/scripts/std_build.py" \ --args1="--bins=cat-gateway/cat-gateway" \ - --args2="--cov_report=$HOME/coverage-report.info" \ + --args2="--cov_report=$HOME/build/coverage-report.info" \ --output="release/cat-gateway" \ --junit="cat-gateway.junit-report.xml" \ --coverage="cat-gateway.coverage.info" \ @@ -81,4 +104,26 @@ package-cat-gateway: get-api-locally: FROM scratch COPY +build/doc/cat-gateway-api.json cat-gateway-api.json - SAVE ARTIFACT cat-gateway-api.json AS LOCAL cat-gateway-api.json \ No newline at end of file + SAVE ARTIFACT cat-gateway-api.json AS LOCAL cat-gateway-api.json + +# build-src-check: Check for any caching issues with the source we are building against. +check-builder-src-cache: + FROM +builder + + # Don't build the src in the home directory itself, because it contains stuff. + WORKDIR $HOME/build + RUN rm -rf * + + COPY +builder-src/src_fingerprint.txt .. + + RUN --no-cache echo "Cache Disabled" + + # Uncached copy of the source we build. + DO +COPY_SRC + + # Generate Checksums of the source + DO rust-ci+FINGERPRINT_SRC --FINGERPRINT=src_fingerprint_uncached.txt + + RUN diff ../src_fingerprint.txt ../src_fingerprint_uncached.txt \ + || (echo "ERROR: Source fingerprints do not match. Caching Error Detected!!" && exit 1) \ + && echo "Source fingerprints match. Caching OK." diff --git a/catalyst-gateway/Justfile b/catalyst-gateway/Justfile index 724c79f5bc..703ec5f529 100644 --- a/catalyst-gateway/Justfile +++ b/catalyst-gateway/Justfile @@ -50,6 +50,8 @@ run-cat-gateway: build-cat-gateway # Run cat-gateway natively on mainnet run-cat-gateway-mainnet: build-cat-gateway - CHAIN_FOLLOWER_SYNC_TASKS="1" \ + CHAIN_FOLLOWER_SYNC_TASKS="32" \ + CHAIN_FOLLOWER_DL_CONNECTIONS="64" \ + CHAIN_FOLLOWER_DL_CHUNK_SIZE="4" \ RUST_LOG="error,cat_gateway=debug,cardano_chain_follower=debug,mithril-client=debug" \ ./target/release/cat-gateway run --log-level debug diff --git a/catalyst-gateway/bin/src/cardano/mod.rs b/catalyst-gateway/bin/src/cardano/mod.rs index 60ac5ae2ab..41e775865f 100644 --- a/catalyst-gateway/bin/src/cardano/mod.rs +++ b/catalyst-gateway/bin/src/cardano/mod.rs @@ -1,6 +1,6 @@ //! Logic for orchestrating followers -use std::{fmt::Display, time::Duration}; +use std::{fmt::Display, sync::Arc, time::Duration}; use cardano_chain_follower::{ ChainFollower, ChainSyncConfig, Network, Point, ORIGIN_POINT, TIP_POINT, @@ -8,10 +8,17 @@ use cardano_chain_follower::{ use duration_string::DurationString; use futures::{stream::FuturesUnordered, StreamExt}; use rand::{Rng, SeedableRng}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use crate::{ - db::index::{block::index_block, session::CassandraSession}, + db::index::{ + block::index_block, + queries::sync_status::{ + get::{get_sync_status, SyncStatus}, + update::update_sync_status, + }, + session::CassandraSession, + }, settings::{chain_follower, Settings}, }; @@ -43,6 +50,7 @@ async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> { } /// Data we return from a sync task. +#[derive(Clone)] struct SyncParams { /// What blockchain are we syncing. chain: Network, @@ -52,8 +60,12 @@ struct SyncParams { end: Point, /// The first block we successfully synced. first_indexed_block: Option, + /// Is the starting point immutable? (True = immutable, false = don't know.) + first_is_immutable: bool, /// The last block we successfully synced. last_indexed_block: Option, + /// Is the ending point immutable? (True = immutable, false = don't know.) + last_is_immutable: bool, /// The number of blocks we successfully synced overall. total_blocks_synced: u64, /// The number of blocks we successfully synced, in the last attempt. @@ -63,7 +75,9 @@ struct SyncParams { /// The number of retries so far on this sync task. backoff_delay: Option, /// If the sync completed without error or not. - result: Option>, + result: Arc>>, + /// Chain follower roll forward. + follower_roll_forward: Option, } impl Display for SyncParams { @@ -77,11 +91,19 @@ impl Display for SyncParams { write!(f, "start: {}, end: {}", self.start, self.end)?; if let Some(first) = self.first_indexed_block.as_ref() { - write!(f, ", first_indexed_block: {first}")?; + write!( + f, + ", first_indexed_block: {first}{}", + if self.first_is_immutable { ":I" } else { "" } + )?; } if let Some(last) = self.last_indexed_block.as_ref() { - write!(f, ", last_indexed_block: {last}")?; + write!( + f, + ", last_indexed_block: {last}{}", + if self.last_is_immutable { ":I" } else { "" } + )?; } if self.retries > 0 { @@ -122,12 +144,15 @@ impl SyncParams { start, end, first_indexed_block: None, + first_is_immutable: false, last_indexed_block: None, + last_is_immutable: false, total_blocks_synced: 0, last_blocks_synced: 0, retries: 0, backoff_delay: None, - result: None, + result: Arc::new(None), + follower_roll_forward: None, } } @@ -148,36 +173,39 @@ impl SyncParams { }; } - Self { - chain: self.chain, - start: self.start.clone(), - end: self.end.clone(), - first_indexed_block: self.first_indexed_block.clone(), - last_indexed_block: self.last_indexed_block.clone(), - total_blocks_synced: self.total_blocks_synced, - last_blocks_synced: 0, - retries: retry_count, - backoff_delay: backoff, - result: None, - } + let mut retry = self.clone(); + retry.last_blocks_synced = 0; + retry.retries = retry_count; + retry.backoff_delay = backoff; + retry.result = Arc::new(None); + retry.follower_roll_forward = None; + + retry } /// Convert Params into the result of the sync. fn done( - &self, first: Option, last: Option, synced: u64, result: anyhow::Result<()>, + &self, first: Option, first_immutable: bool, last: Option, + last_immutable: bool, synced: u64, result: anyhow::Result<()>, ) -> Self { - Self { - chain: self.chain, - start: self.start.clone(), - end: self.end.clone(), - first_indexed_block: first, - last_indexed_block: last, - total_blocks_synced: synced + self.total_blocks_synced, - last_blocks_synced: synced, - retries: self.retries, - backoff_delay: self.backoff_delay, - result: Some(result), + if result.is_ok() && first_immutable && last_immutable { + // Update sync status in the Immutable DB. + // Can fire and forget, because failure to update DB will simply cause the chunk to be + // re-indexed, on recovery. + update_sync_status(self.end.slot_or_default(), self.start.slot_or_default()); } + + let mut done = self.clone(); + done.first_indexed_block = first; + done.first_is_immutable = first_immutable; + done.last_indexed_block = last; + done.last_is_immutable = last_immutable; + done.total_blocks_synced += synced; + done.last_blocks_synced = synced; + + done.result = Arc::new(Some(result)); + + done } /// Get where this sync run actually needs to start from. @@ -218,7 +246,9 @@ fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle { info!(chain=%params.chain, params=%params,"Indexing DB is ready"); let mut first_indexed_block = params.first_indexed_block.clone(); + let mut first_immutable = params.first_is_immutable; let mut last_indexed_block = params.last_indexed_block.clone(); + let mut last_immutable = params.last_is_immutable; let mut blocks_synced = 0u64; let mut follower = @@ -228,7 +258,21 @@ fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle { cardano_chain_follower::Kind::ImmutableBlockRollForward => { // We only process these on the follower tracking the TIP. if params.end == TIP_POINT { - warn!("TODO: Immutable Chain roll forward"); + // What we need to do here is tell the primary follower to start a new sync + // for the new immutable data, and then purge the volatile database of the + // old data (after the immutable data has synced). + info!(chain=%params.chain, "Immutable chain rolled forward."); + let mut result = params.done( + first_indexed_block, + first_immutable, + last_indexed_block, + last_immutable, + blocks_synced, + Ok(()), + ); + // Signal the point the immutable chain rolled forward to. + result.follower_roll_forward = Some(chain_update.block_data().point()); + return result; }; }, cardano_chain_follower::Kind::Block => { @@ -239,27 +283,37 @@ fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle { error!(chain=%params.chain, error=%error, params=%params, error_msg); return params.done( first_indexed_block, + first_immutable, last_indexed_block, + last_immutable, blocks_synced, Err(error.context(error_msg)), ); } + last_immutable = block.immutable(); + last_indexed_block = Some(block.point()); + if first_indexed_block.is_none() { + first_immutable = last_immutable; first_indexed_block = Some(block.point()); } - last_indexed_block = Some(block.point()); blocks_synced += 1; }, cardano_chain_follower::Kind::Rollback => { warn!("TODO: Live Chain rollback"); + // What we need to do here, is purge the live DB of records after the + // rollback point. We need to complete this operation here + // before we keep syncing the live chain. }, } } let result = params.done( first_indexed_block, + first_immutable, last_indexed_block, + last_immutable, blocks_synced, Ok(()), ); @@ -270,460 +324,234 @@ fn sync_subchain(params: SyncParams) -> tokio::task::JoinHandle { }) } -/// Start followers as per defined in the config -#[allow(unused)] -pub(crate) async fn start_followers() -> anyhow::Result<()> { - let cfg = Settings::follower_cfg(); +/// The synchronisation task, and its state. +/// There should ONLY ever be one of these at any time. +struct SyncTask { + /// Chain follower configuration. + cfg: chain_follower::EnvVars, - // Log the chain follower configuration. - cfg.log(); + /// The current running sync tasks. + sync_tasks: FuturesUnordered>, - // Start Syncing the blockchain, so we can consume its data as required. - start_sync_for(&cfg).await?; - info!(chain=%cfg.chain,"Chain Sync is started."); + /// // How many immutable chain follower sync tasks we are running. + current_sync_tasks: u16, - tokio::spawn(async move { + /// Start for the next block we would sync. + start_slot: u64, + + /// The immutable tip slot. + immutable_tip_slot: u64, + + /// The live tip slot. + live_tip_slot: u64, + + /// Current Sync Status + sync_status: Vec, +} + +impl SyncTask { + /// Create a new `SyncTask`. + fn new(cfg: chain_follower::EnvVars) -> SyncTask { + SyncTask { + cfg, + sync_tasks: FuturesUnordered::new(), + start_slot: 0, + current_sync_tasks: 0, + immutable_tip_slot: 0, + live_tip_slot: 0, + sync_status: Vec::new(), + } + } + + /// Primary Chain Follower task. + /// + /// This continuously runs in the background, and never terminates. + async fn run(&mut self) { // We can't sync until the local chain data is synced. // This call will wait until we sync. - let tips = cardano_chain_follower::ChainFollower::get_tips(cfg.chain).await; - let immutable_tip_slot = tips.0.slot_or_default(); - let live_tip_slot = tips.1.slot_or_default(); - info!(chain=%cfg.chain, immutable_tip=immutable_tip_slot, live_tip=live_tip_slot, "Blockchain ready to sync from."); - - let mut sync_tasks: FuturesUnordered> = - FuturesUnordered::new(); - - // Start the Immutable Chain sync tasks. - // If the number of sync tasks is zero, just have one. - // Note: this shouldn't be possible, but easy to handle if it is. - let sub_chain_slots = immutable_tip_slot - .checked_div(cfg.sync_tasks.into()) - .unwrap_or(immutable_tip_slot); - // Need steps in a usize, in the highly unlikely event the steps are > max usize, make - // them max usize. - let sub_chain_steps: usize = sub_chain_slots.try_into().unwrap_or(usize::MAX); - - let mut start_point = ORIGIN_POINT; - for slot_end in (sub_chain_slots..immutable_tip_slot).step_by(sub_chain_steps) { - let next_point = cardano_chain_follower::Point::fuzzy(slot_end); - - sync_tasks.push(sync_subchain(SyncParams::new( - cfg.chain, - start_point, - next_point.clone(), - ))); - - // Next start == last end. - start_point = next_point; - } + let tips = cardano_chain_follower::ChainFollower::get_tips(self.cfg.chain).await; + self.immutable_tip_slot = tips.0.slot_or_default(); + self.live_tip_slot = tips.1.slot_or_default(); + info!(chain=%self.cfg.chain, immutable_tip=self.immutable_tip_slot, live_tip=self.live_tip_slot, "Blockchain ready to sync from."); - // Start the Live Chain sync task - This never stops syncing. - sync_tasks.push(sync_subchain(SyncParams::new( - cfg.chain, - start_point, + // Wait for indexing DB to be ready before continuing. + // We do this after the above, because other nodes may have finished already, and we don't + // want to wait do any work they already completed while we were fetching the blockchain. + CassandraSession::wait_is_ready(INDEXING_DB_READY_WAIT_INTERVAL).await; + info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state"); + self.sync_status = get_sync_status().await; + debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status); + + // Start the Live Chain sync task - This can never end because it is syncing to TIP. + // So, if it fails, it will automatically be restarted. + self.sync_tasks.push(sync_subchain(SyncParams::new( + self.cfg.chain, + cardano_chain_follower::Point::fuzzy(self.immutable_tip_slot), TIP_POINT, ))); + self.start_immutable_followers(); + // Wait Sync tasks to complete. If they fail and have not completed, reschedule them. + // If an immutable sync task ends OK, and we still have immutable data to sync then + // start a new task. // They will return from this iterator in the order they complete. - while let Some(completed) = sync_tasks.next().await { - let remaining_followers = sync_tasks.len(); - + // This iterator actually never ends, because the live sync task is always restarted. + while let Some(completed) = self.sync_tasks.next().await { match completed { Ok(finished) => { // Sync task finished. Check if it completed OK or had an error. // If it failed, we need to reschedule it. - let last_block = finished - .last_indexed_block - .clone() - .map_or("None".to_string(), |v| v.to_string()); - - let first_block = finished - .first_indexed_block - .clone() - .map_or("None".to_string(), |v| v.to_string()); - - // The TIP follower should NEVER end, even without error, so report that as an - // error. It can fail if the index DB goes down in some way. + // The TIP follower should NEVER end, unless there is an immutable roll forward, + // or there is an error. If this is not a roll forward, log an error. + // It can fail if the index DB goes down in some way. // Restart it always. if finished.end == TIP_POINT { - error!(chain=%cfg.chain, report=%finished, + if let Some(ref roll_forward_point) = finished.follower_roll_forward { + // Advance the known immutable tip, and try and start followers to reach + // it. + self.immutable_tip_slot = roll_forward_point.slot_or_default(); + self.start_immutable_followers(); + } else { + error!(chain=%self.cfg.chain, report=%finished, "The TIP follower failed, restarting it."); + } // Start the Live Chain sync task again from where it left off. - sync_tasks.push(sync_subchain(finished.retry())); + self.sync_tasks.push(sync_subchain(finished.retry())); } else if let Some(result) = finished.result.as_ref() { match result { Ok(()) => { - info!(chain=%cfg.chain, report=%finished, + self.current_sync_tasks -= 1; + info!(chain=%self.cfg.chain, report=%finished, "The Immutable follower completed successfully."); + + // If we need more immutable chain followers to sync the block + // chain, we can now start them. + self.start_immutable_followers(); }, Err(error) => { - // let report = &finished.to_string(); - error!(chain=%cfg.chain, report=%finished, + error!(chain=%self.cfg.chain, report=%finished, error=%error, "An Immutable follower failed, restarting it."); - // Start the Live Chain sync task again from where it left off. - sync_tasks.push(sync_subchain(finished.retry())); + // Restart the Immutable Chain sync task again from where it left + // off. + self.sync_tasks.push(sync_subchain(finished.retry())); }, } } else { - error!(chain=%cfg.chain, report=%finished, - "The Immutable follower completed, but without a proper result."); + error!(chain=%self.cfg.chain, report=%finished, + "BUG: The Immutable follower completed, but without a proper result."); } }, Err(error) => { - error!(error=%error, "Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point."); + error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point."); }, } - } - error!("Sync tasks have all stopped. This is an unexpected error!"); - }); - - Ok(()) -} - -const _UNUSED_CODE: &str = r#" - -/// Spawn follower threads and return associated handlers -async fn spawn_followers( - configs: Vec, _data_refresh_tick: u64, machine_id: String, -) -> anyhow::Result> { - let mut follower_tasks = Vec::new(); - - for config in &configs { - let follower_handler = spawn_follower( - config.network, - &config.relay, - machine_id.clone(), - &config.mithril_snapshot.path, - ) - .await?; + // TODO: IF there is only 1 chain follower left in sync_tasks, then all + // immutable followers have finished. + // When this happens we need to purge the live index of any records that exist + // before the current immutable tip. + // Note: to prevent a data race when multiple nodes are syncing, we probably + // want to put a gap in this, so that there are X slots of overlap + // between the live chain and immutable chain. This gap should be + // a parameter. + } - follower_tasks.push(follower_handler); + error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!"); } - Ok(follower_tasks) -} - -/// Initiate single follower and returns associated task handler -/// which facilitates future control over spawned threads. -async fn spawn_follower( - network: Network, relay: &str, machine_id: MachineId, snapshot: &str, -) -> anyhow::Result { - // Establish point at which the last follower stopped updating in order to pick up - // where it left off. If there was no previous follower, start indexing from - // genesis point. - let start_from = match EventDB::last_updated_state(network).await { - Ok((slot_no, block_hash, _)) => Point::new(slot_no.try_into()?, block_hash), - Err(err) if err.is::() => Point::Origin, - Err(err) => return Err(err), - }; - - info!("Starting {network:?} follower from {start_from:?}",); - - let mut follower = instantiate_follower(start_from, snapshot, network, relay).await?; - - let genesis_values = network_genesis_values(&network) - .ok_or(anyhow::anyhow!("Obtaining genesis values failed"))?; - - let task = tokio::spawn(async move { - process_blocks(&mut follower, network, machine_id, &genesis_values).await; - }); - - Ok(task) -} - -/// Process next block from the follower -async fn process_blocks( - follower: &mut Follower, network: Network, machine_id: MachineId, - genesis_values: &GenesisValues, -) { - info!("Follower started processing blocks"); - - let (blocks_tx, mut blocks_rx) = mpsc::channel(1); - - tokio::spawn({ - let genesis_values = genesis_values.clone(); - - async move { - let mut blocks_buffer = Vec::new(); - - let mut ticker = tokio::time::interval(Duration::from_secs(60)); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - - loop { - tokio::select! { - res = blocks_rx.recv() => { - match res { - Some(block_data) => { - blocks_buffer.push(block_data); - - if blocks_buffer.len() >= MAX_BLOCKS_BATCH_LEN { - index_block_buffer(&genesis_values, network, &machine_id, std::mem::take(&mut blocks_buffer)).await; - - // Reset batch ticker since we just indexed the blocks buffer - ticker.reset(); - } - } - - None => { - break; - } - } - } - - _ = ticker.tick() => { - // This executes when we have not indexed blocks for more than the configured - // tick interval. This means that if any errors occur in that time we lose the buffered block data (e.g. - // cat-gateway is shutdown ungracefully). This is not a problem since cat-gateway - // checkpoints the latest database writes so it simply restarts from the last - // written block. - // - // This is most likely to happen when following from the tip or receiving blocks - // from the network (since updates will come at larger intervals). - if blocks_buffer.is_empty() { - continue; - } + /// Start immutable followers, if we can + fn start_immutable_followers(&mut self) { + // Start the Immutable Chain sync tasks, as required. + // We will start at most the number of configured sync tasks. + // The live chain sync task is not counted as a sync task for this config value. + + // Nothing to do if the start_slot is not less than the end of the immutable chain. + if self.start_slot < self.immutable_tip_slot { + // Will also break if there are no more slots left to sync. + while self.current_sync_tasks < self.cfg.sync_tasks { + let end_slot = self + .immutable_tip_slot + .min(self.start_slot + self.cfg.sync_chunk_max_slots); + + if let Some((first_point, last_point)) = + self.get_syncable_range(self.start_slot, end_slot) + { + self.sync_tasks.push(sync_subchain(SyncParams::new( + self.cfg.chain, + first_point, + last_point.clone(), + ))); + self.current_sync_tasks += 1; + } - let current_buffer = std::mem::take(&mut blocks_buffer); - index_block_buffer(&genesis_values, network, &machine_id, current_buffer).await; + // The one slot overlap is deliberate, it doesn't hurt anything and prevents all off + // by one problems that may occur otherwise. + self.start_slot = end_slot; - // Reset the ticker so it counts the interval as starting after we wrote everything - // to the database. - ticker.reset(); - } + if end_slot == self.immutable_tip_slot { + break; } } + // `start_slot` is still used, because it is used to keep syncing chunks as required + // while each immutable sync task finishes. + info!(chain=%self.cfg.chain, tasks=self.current_sync_tasks, until=self.start_slot, "Persistent Indexing DB tasks started"); } - }); - - loop { - match follower.next().await { - Ok(chain_update) => { - match chain_update { - ChainUpdate::Block(data) => { - if blocks_tx.send(data).await.is_err() { - error!("Block indexing task not running"); - break; - }; - }, - ChainUpdate::Rollback(data) => { - let block = match data.decode() { - Ok(block) => block, - Err(err) => { - error!("Unable to decode {network:?} block {err} - skip.."); - continue; - }, - }; + } - info!( - "Rollback block NUMBER={} SLOT={} HASH={}", - block.number(), - block.slot(), - hex::encode(block.hash()), - ); - }, + /// Check if the requested range has already been indexed. + /// If it hasn't just return the slots as points. + /// If it has, return a subset that hasn't been indexed if any, or None if its been + /// completely indexed already. + fn get_syncable_range(&self, start: u64, end: u64) -> Option<(Point, Point)> { + for sync_block in &self.sync_status { + // Check if we start within a previously synchronized block. + if start >= sync_block.start_slot && start <= sync_block.end_slot { + // Check if we are fully contained by the sync block, if so, nothing to sync. + if end <= sync_block.end_slot { + return None; } - }, - Err(err) => { - error!( - "Unable to receive next update from the {network:?} follower err: {err} - skip..", - ); - continue; - }, - } - } -} -/// Consumes a block buffer and indexes its data. -async fn index_block_buffer( - genesis_values: &GenesisValues, network: Network, machine_id: &MachineId, - buffer: Vec, -) { - info!("Starting data batch indexing"); - - let mut blocks = Vec::new(); - - for block_data in &buffer { - match block_data.decode() { - Ok(block) => blocks.push(block), - Err(e) => { - error!(error = ?e, "Failed to decode block"); - }, + // In theory, we could extend into another sync block, but because we could extend + // into an unbounded number of sync blocks, we would need to bust + // this range into an unbounded number of sub chunks. + // It is not a problem to sync the same data mutiple times, so for simplicity we do + // not account for this, if the requested range goes beyond the sync + // block it starts within we assume that the rest is not synced. + return Some(( + cardano_chain_follower::Point::fuzzy(sync_block.end_slot), + cardano_chain_follower::Point::fuzzy(end), + )); + } } - } - match index_many_blocks(genesis_values, network, machine_id, &blocks).await { - Ok(()) => { - info!("Finished indexing data batch"); - }, - Err(e) => { - error!(error = ?e, "Failed indexing data batch"); - }, + let start_slot = if start == 0 { + ORIGIN_POINT + } else { + cardano_chain_follower::Point::fuzzy(start) + }; + + Some((start_slot, cardano_chain_follower::Point::fuzzy(end))) } } -/// Index a slice of blocks. -async fn index_many_blocks( - genesis_values: &GenesisValues, network: Network, machine_id: &MachineId, - blocks: &[MultiEraBlock<'_>], -) -> anyhow::Result<()> { - let Some(last_block) = blocks.last() else { - return Ok(()); - }; - - let network_str = network.to_string(); - - index_blocks(genesis_values, &network_str, blocks).await?; - index_transactions(blocks, &network_str).await?; - index_voter_registrations(blocks, network).await?; - - match EventDB::refresh_last_updated( - chrono::offset::Utc::now(), - last_block.slot().try_into()?, - last_block.hash().to_vec(), - network, - machine_id, - ) - .await - { - Ok(()) => {}, - Err(err) => { - error!("Unable to mark {network:?} last update point {err} - skip..",); - }, - }; - - Ok(()) -} +/// Start followers as per defined in the config +pub(crate) async fn start_followers() -> anyhow::Result<()> { + let cfg = Settings::follower_cfg(); -/// Index the data from the given blocks. -async fn index_blocks( - genesis_values: &GenesisValues, network_str: &str, blocks: &[MultiEraBlock<'_>], -) -> anyhow::Result { - let values: Vec<_> = blocks - .iter() - .filter_map(|block| { - IndexedFollowerDataParams::from_block_data(genesis_values, network_str, block) - }) - .collect(); - - EventDB::index_many_follower_data(&values) - .await - .context("Indexing block data")?; - - Ok(values.len()) -} + // Log the chain follower configuration. + cfg.log(); -/// Index transactions (and its inputs and outputs) from a slice of blocks. -async fn index_transactions(blocks: &[MultiEraBlock<'_>], network_str: &str) -> anyhow::Result<()> { - let blocks_txs: Vec<_> = blocks - .iter() - .flat_map(|b| b.txs().into_iter().map(|tx| (b.slot(), tx))) - .collect(); + // Start Syncing the blockchain, so we can consume its data as required. + start_sync_for(&cfg).await?; + info!(chain=%cfg.chain,"Chain Sync is started."); - index_transactions_data(network_str, &blocks_txs).await?; - index_transaction_outputs_data(&blocks_txs).await?; - index_transaction_inputs_data(&blocks_txs).await?; + tokio::spawn(async move { + let mut sync_task = SyncTask::new(cfg); + sync_task.run().await; + }); Ok(()) } - -/// Index transactions data. -async fn index_transactions_data( - network_str: &str, blocks_txs: &[(u64, MultiEraTx<'_>)], -) -> anyhow::Result { - let values: Vec<_> = blocks_txs - .iter() - .map(|(slot, tx)| { - Ok(IndexedTxnParams { - id: tx.hash().to_vec(), - slot_no: (*slot).try_into()?, - network: network_str, - }) - }) - .collect::>>()?; - - EventDB::index_many_txn_data(&values) - .await - .context("Indexing transaction data")?; - - Ok(values.len()) -} - -/// Index transaction outputs data. -async fn index_transaction_outputs_data( - blocks_txs: &[(u64, MultiEraTx<'_>)], -) -> anyhow::Result { - let values: Vec<_> = blocks_txs - .iter() - .flat_map(|(_, tx)| IndexedTxnOutputParams::from_txn_data(tx)) - .collect(); - - EventDB::index_many_txn_output_data(&values) - .await - .context("Indexing transaction outputs")?; - - Ok(values.len()) -} - -/// Index transaction inputs data. -async fn index_transaction_inputs_data( - blocks_txs: &[(u64, MultiEraTx<'_>)], -) -> anyhow::Result { - let values: Vec<_> = blocks_txs - .iter() - .flat_map(|(_, tx)| IndexedTxnInputParams::from_txn_data(tx)) - .collect(); - - EventDB::index_many_txn_input_data(&values) - .await - .context("Indexing transaction inputs")?; - - Ok(values.len()) -} - -/// Index voter registrations from a slice of blocks. -async fn index_voter_registrations( - blocks: &[MultiEraBlock<'_>], network: Network, -) -> anyhow::Result { - let values: Vec<_> = blocks - .iter() - .filter_map(|block| IndexedVoterRegistrationParams::from_block_data(block, network)) - .flatten() - .collect(); - - EventDB::index_many_voter_registration_data(&values) - .await - .context("Indexing voter registration")?; - - Ok(values.len()) -} - -/// Instantiate the follower. -/// If there is metadata present which allows us to bootstrap from a point in time -/// We start from there, if not; we start from genesis. -async fn instantiate_follower( - start_from: Point, snapshot: &str, network: Network, relay: &str, -) -> anyhow::Result { - let mut follower_cfg = FollowerConfigBuilder::default() - .follow_from(start_from) - .mithril_snapshot_path(PathBuf::from(snapshot)) - .build(); - - let follower = match Follower::connect(relay, network, follower_cfg.clone()).await { - Ok(follower) => follower, - Err(err) => { - error!("Unable to bootstrap via mithril snapshot {err}. Trying network..",); - - // We know bootstrapping from the snapshot fails, remove path and try from network - follower_cfg.mithril_snapshot_path = None; - Follower::connect(relay, network, follower_cfg).await? - }, - }; - - Ok(follower) -} - -"#; diff --git a/catalyst-gateway/bin/src/db/index/queries/cql/get_sync_status.cql b/catalyst-gateway/bin/src/db/index/queries/cql/get_sync_status.cql new file mode 100644 index 0000000000..713a302b6c --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/cql/get_sync_status.cql @@ -0,0 +1,7 @@ +-- Get all the sync status records. +SELECT + end_slot, + start_slot, + sync_time, + node_id +FROM sync_status; diff --git a/catalyst-gateway/bin/src/db/index/queries/cql/insert_sync_status.cql b/catalyst-gateway/bin/src/db/index/queries/cql/insert_sync_status.cql new file mode 100644 index 0000000000..95aaf9fe95 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/cql/insert_sync_status.cql @@ -0,0 +1,12 @@ +-- Insert an update to the synchronisation status table +INSERT INTO sync_status ( + end_slot, + start_slot, + sync_time, + node_id +) VALUES ( + :end_slot, + :start_slot, + :sync_time, + :node_id +); diff --git a/catalyst-gateway/bin/src/db/index/queries/cql/update_txo_spent.cql b/catalyst-gateway/bin/src/db/index/queries/cql/update_txo_spent.cql index e74704815c..58a33ad2b3 100644 --- a/catalyst-gateway/bin/src/db/index/queries/cql/update_txo_spent.cql +++ b/catalyst-gateway/bin/src/db/index/queries/cql/update_txo_spent.cql @@ -1,6 +1,7 @@ +-- Update TXO Spent by the stake address UPDATE txo_by_stake -SET spent_slot = :spent_slot + SET spent_slot = :spent_slot WHERE stake_address = :stake_address -AND txn = :txn -AND txo = :txo -AND slot_no = :slot_no + AND txn = :txn + AND txo = :txo + AND slot_no = :slot_no diff --git a/catalyst-gateway/bin/src/db/index/queries/mod.rs b/catalyst-gateway/bin/src/db/index/queries/mod.rs index e34db0647e..c14eb303e7 100644 --- a/catalyst-gateway/bin/src/db/index/queries/mod.rs +++ b/catalyst-gateway/bin/src/db/index/queries/mod.rs @@ -3,6 +3,7 @@ //! This improves query execution time. pub(crate) mod staked_ada; +pub(crate) mod sync_status; use std::{fmt::Debug, sync::Arc}; @@ -16,6 +17,7 @@ use staked_ada::{ get_txi_by_txn_hash::GetTxiByTxnHashesQuery, get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery, }; +use sync_status::update::SyncStatusInsertQuery; use super::block::{ certs::CertInsertQuery, cip36::Cip36InsertQuery, txi::TxiInsertQuery, txo::TxoInsertQuery, @@ -51,7 +53,7 @@ pub(crate) enum PreparedQuery { TxoSpentUpdateQuery, } -/// All prepared SELECT query statements. +/// All prepared SELECT query statements (return data). pub(crate) enum PreparedSelectQuery { /// Get TXO by stake address query. GetTxoByStakeAddress, @@ -59,6 +61,12 @@ pub(crate) enum PreparedSelectQuery { GetTxiByTransactionHash, } +/// All prepared UPSERT query statements (inserts/updates a single value of data). +pub(crate) enum PreparedUpsertQuery { + /// Sync Status Insert + SyncStatusInsert, +} + /// All prepared queries for a session. #[allow(clippy::struct_field_names)] pub(crate) struct PreparedQueries { @@ -86,6 +94,8 @@ pub(crate) struct PreparedQueries { txo_by_stake_address_query: PreparedStatement, /// Get TXI by transaction hash. txi_by_txn_hash_query: PreparedStatement, + /// Insert Sync Status update. + sync_status_insert: PreparedStatement, } /// An individual query response that can fail @@ -110,6 +120,7 @@ impl PreparedQueries { UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await; let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await; let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await; + let sync_status_insert = SyncStatusInsertQuery::prepare(session).await; let ( txo_insert_queries, @@ -137,6 +148,7 @@ impl PreparedQueries { txo_spent_update_queries: txo_spent_update_queries?, txo_by_stake_address_query: txo_by_stake_address_query?, txi_by_txn_hash_query: txi_by_txn_hash_query?, + sync_status_insert: sync_status_insert?, }) } @@ -183,6 +195,25 @@ impl PreparedQueries { Ok(sized_batches) } + /// Executes a single query with the given parameters. + /// + /// Returns no data, and an error if the query fails. + pub(crate) async fn execute_upsert

( + &self, session: Arc, upsert_query: PreparedUpsertQuery, params: P, + ) -> anyhow::Result<()> + where P: SerializeRow { + let prepared_stmt = match upsert_query { + PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert, + }; + + session + .execute_unpaged(prepared_stmt, params) + .await + .map_err(|e| anyhow::anyhow!(e))?; + + Ok(()) + } + /// Executes a select query with the given parameters. /// /// Returns an iterator that iterates over all the result pages that the query diff --git a/catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs b/catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs new file mode 100644 index 0000000000..804192462b --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/sync_status/get.rs @@ -0,0 +1,193 @@ +//! Get Sync Status query + +use futures::StreamExt; +use tracing::{debug, warn}; + +use super::update::row::SyncStatusQueryParams; +use crate::{db::index::session::CassandraSession, service::utilities::convert::from_saturating}; + +/// Get TXI query string. +const GET_SYNC_STATUS: &str = include_str!("../cql/get_sync_status.cql"); + +/// Clean Sync Status Response +#[derive(PartialEq, Debug)] +pub(crate) struct SyncStatus { + /// End Slot. + pub(crate) end_slot: u64, + /// Start Slot. + pub(crate) start_slot: u64, + /// Sync Time. + pub(crate) sync_time: u64, + /// Node ID + pub(crate) node_id: String, +} + +/// Convert a big uint to a u64, saturating if its out of range. +fn big_uint_to_u64(value: &num_bigint::BigInt) -> u64 { + let (sign, digits) = value.to_u64_digits(); + if sign == num_bigint::Sign::Minus || digits.is_empty() { + return 0; + } + if digits.len() > 1 { + return u64::MAX; + } + // 100% safe due to the above checks. + #[allow(clippy::indexing_slicing)] + digits[0] +} + +/// Merge consecutive sync records, to make processing them easier. +fn merge_consecutive_sync_records(mut synced_chunks: Vec) -> Vec { + // Sort the chunks by the starting key, if the ending key overlaps, we will deal with that + // during the merge. + synced_chunks.sort_by_key(|rec| rec.start_slot); + + let mut best_sync: Vec = vec![]; + let mut current_status: Option = None; + for rec in synced_chunks { + if let Some(current) = current_status.take() { + if rec.start_slot >= current.start_slot && rec.end_slot <= current.end_slot { + // The new record is contained fully within the previous one. + // We will ignore the new record and use the previous one instead. + current_status = Some(current); + } else if rec.start_slot <= current.end_slot + 1 { + // Either overlaps, or is directly consecutive. + // But not fully contained within the previous one. + current_status = Some(SyncStatus { + end_slot: rec.end_slot, + start_slot: current.start_slot, + sync_time: rec.sync_time.max(current.sync_time), + node_id: rec.node_id, + }); + } else { + // Not consecutive, so store it. + // And set a new current one. + best_sync.push(current); + current_status = Some(rec); + } + } else { + current_status = Some(rec); + } + } + // Could have the final one in current still, so store it + if let Some(current) = current_status.take() { + best_sync.push(current); + } + + best_sync +} + +/// Get the sync status. +/// +/// Note: This only happens once when a node starts. So there is no need to prepare it. +/// It is also only ever run on the persistent database. +/// +/// Regarding failures: +/// Failures of this function will simply cause the node to re-sync which is non fatal. +pub(crate) async fn get_sync_status() -> Vec { + let mut synced_chunks: Vec = vec![]; + + let Some(session) = CassandraSession::get(true) else { + warn!("Failed to get Cassandra Session, trying to get current indexing status"); + return synced_chunks; + }; + + // Get the raw underlying session, so we can do an unprepared simple query. + let session = session.get_raw_session(); + + let mut results = match session.query_iter(GET_SYNC_STATUS, ()).await { + Ok(result) => result.into_typed::(), + Err(err) => { + warn!(error=%err, "Failed to get sync status results from query."); + return synced_chunks; + }, + }; + + // Get all the sync records, and de-cassandra-ize the values + while let Some(next_row) = results.next().await { + match next_row { + Err(err) => warn!(error=%err, "Failed to deserialize sync status results from query."), + Ok(row) => { + debug!("Sync Status: {:?}", row); + synced_chunks.push(SyncStatus { + end_slot: big_uint_to_u64(&row.end_slot), + start_slot: big_uint_to_u64(&row.start_slot), + sync_time: from_saturating(row.sync_time.0), + node_id: row.node_id, + }); + }, + } + } + + merge_consecutive_sync_records(synced_chunks) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + /// This test checks we can properly merge sync status chunks. + fn test_sync_merge() { + // Add some test records, out of order. + // Two mergeable groups + let synced_chunks: Vec = vec![ + SyncStatus { + end_slot: 200_000, + start_slot: 112_001, + sync_time: 1_200_000, + node_id: "test-node-1".to_string(), + }, + SyncStatus { + end_slot: 12000, + start_slot: 0, + sync_time: 100_100, + node_id: "test-node-1".to_string(), + }, + SyncStatus { + end_slot: 99000, + start_slot: 56789, + sync_time: 200_000, + node_id: "test-node-2".to_string(), + }, + SyncStatus { + end_slot: 112_000, + start_slot: 100_000, + sync_time: 1_100_100, + node_id: "test-node-1".to_string(), + }, + SyncStatus { + end_slot: 56789, + start_slot: 12300, + sync_time: 200_000, + node_id: "test-node-2".to_string(), + }, + SyncStatus { + end_slot: 12345, + start_slot: 0, + sync_time: 100_000, + node_id: "test-node-1".to_string(), + }, + ]; + + let merged_syncs_status = merge_consecutive_sync_records(synced_chunks); + + // Expected result + let expected: &[SyncStatus] = &[ + SyncStatus { + end_slot: 99000, + start_slot: 0, + sync_time: 200_000, + node_id: "test-node-2".to_string(), + }, + SyncStatus { + end_slot: 200_000, + start_slot: 100_000, + sync_time: 1_200_000, + node_id: "test-node-1".to_string(), + }, + ]; + + assert_eq!(merged_syncs_status.as_slice(), expected); + } +} diff --git a/catalyst-gateway/bin/src/db/index/queries/sync_status/mod.rs b/catalyst-gateway/bin/src/db/index/queries/sync_status/mod.rs new file mode 100644 index 0000000000..9f2fa40cc5 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/sync_status/mod.rs @@ -0,0 +1,4 @@ +//! sync status update and query. + +pub(crate) mod get; +pub(crate) mod update; diff --git a/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs new file mode 100644 index 0000000000..03563d2f71 --- /dev/null +++ b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs @@ -0,0 +1,122 @@ +//! Read and write the synchronisation status. + +use std::{sync::Arc, time::SystemTime}; + +use row::SyncStatusQueryParams; +use scylla::{frame::value::CqlTimestamp, prepared_statement::PreparedStatement, Session}; +use tokio::task; +use tracing::{error, warn}; + +use crate::{ + db::index::{ + queries::{PreparedQueries, PreparedUpsertQuery}, + session::CassandraSession, + }, + service::utilities::convert::from_saturating, + settings::Settings, +}; + +/// Insert Sync Status query string. +const INSERT_SYNC_STATUS_QUERY: &str = include_str!("../cql/insert_sync_status.cql"); + +/// Sync Status Row Record Module +#[allow(clippy::expect_used)] +pub(super) mod row { + use scylla::{frame::value::CqlTimestamp, FromRow, SerializeRow}; + + /// Sync Status Record Row (used for both Insert and Query response) + #[derive(SerializeRow, FromRow, Debug)] + pub(crate) struct SyncStatusQueryParams { + /// End Slot. + pub(crate) end_slot: num_bigint::BigInt, + /// Start Slot. + pub(crate) start_slot: num_bigint::BigInt, + /// Sync Time. + pub(crate) sync_time: CqlTimestamp, + /// Node ID + pub(crate) node_id: String, + } +} + +impl SyncStatusQueryParams { + /// Create a new instance of [`SyncStatusQueryParams`] + pub(crate) fn new(end_slot: u64, start_slot: u64) -> Self { + let sync_time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + Ok(now) => now.as_millis(), + Err(_) => 0, // Shouldn't actually happen. + }; + + Self { + end_slot: end_slot.into(), + start_slot: start_slot.into(), + sync_time: CqlTimestamp(from_saturating(sync_time)), + node_id: Settings::service_id().to_owned(), + } + } +} + +/// Sync Status Insert query. +pub(crate) struct SyncStatusInsertQuery; + +impl SyncStatusInsertQuery { + /// Prepares a Sync Status Insert query. + pub(crate) async fn prepare(session: Arc) -> anyhow::Result { + let sync_status_insert_query = PreparedQueries::prepare( + session, + INSERT_SYNC_STATUS_QUERY, + scylla::statement::Consistency::All, + true, + ) + .await; + + if let Err(ref error) = sync_status_insert_query { + error!(error=%error, "Failed to prepare get Sync Status Insert query."); + }; + + sync_status_insert_query + } + + /// Executes a sync status insert query. + pub(crate) async fn execute( + session: &CassandraSession, params: SyncStatusQueryParams, + ) -> anyhow::Result<()> { + session + .execute_upsert(PreparedUpsertQuery::SyncStatusInsert, params) + .await + } +} + +/// Update the sync status of the immutable database. +/// +/// Note: There is no need to update the sync status of the volatile database. +/// +/// Regarding failures: +/// Failures of this function to record status, fail safely. +/// This data is only used to recover sync +/// There fore this function is both fire and forget, and returns no status. +pub(crate) fn update_sync_status(end_slot: u64, start_slot: u64) { + task::spawn(async move { + let Some(session) = CassandraSession::get(true) else { + warn!( + start_slot = start_slot, + end_slot = end_slot, + "Failed to get Cassandra Session, trying to record indexing status" + ); + return; + }; + + if let Err(err) = SyncStatusInsertQuery::execute( + &session, + SyncStatusQueryParams::new(end_slot, start_slot), + ) + .await + { + warn!( + error=%err, + start_slot = start_slot, + end_slot = end_slot, + "Failed to store Sync Status" + ); + }; + }); +} diff --git a/catalyst-gateway/bin/src/db/index/schema/cql/sync_status.cql b/catalyst-gateway/bin/src/db/index/schema/cql/sync_status.cql index 7f82d255e0..0e2494dfb3 100644 --- a/catalyst-gateway/bin/src/db/index/schema/cql/sync_status.cql +++ b/catalyst-gateway/bin/src/db/index/schema/cql/sync_status.cql @@ -5,7 +5,7 @@ CREATE TABLE IF NOT EXISTS sync_status ( end_slot varint, -- The slot that has been indexed up-to (inclusive). start_slot varint, -- The slot the sync block started at (inclusive). sync_time timestamp, -- The time we finished the sync. - node_id uuid, -- The node that synced this data. + node_id text, -- The node that synced this data. PRIMARY KEY (end_slot, start_slot, sync_time, node_id) ); diff --git a/catalyst-gateway/bin/src/db/index/schema/mod.rs b/catalyst-gateway/bin/src/db/index/schema/mod.rs index 61e2f2057b..076e886d36 100644 --- a/catalyst-gateway/bin/src/db/index/schema/mod.rs +++ b/catalyst-gateway/bin/src/db/index/schema/mod.rs @@ -17,7 +17,7 @@ use crate::{settings::cassandra_db, utils::blake2b_hash::generate_uuid_string_fr /// change accidentally, and is NOT to be used directly to set the schema version of the /// table namespaces. #[allow(dead_code)] -const SCHEMA_VERSION: &str = "a0e54866-1f30-8ad2-9ac7-df1cfaf9c634"; +const SCHEMA_VERSION: &str = "10463640-3b7b-8a25-9d42-5eb64e44bd62"; /// Keyspace Create (Templated) const CREATE_NAMESPACE_CQL: &str = include_str!("./cql/namespace.cql"); diff --git a/catalyst-gateway/bin/src/db/index/session.rs b/catalyst-gateway/bin/src/db/index/session.rs index 41c447634c..2f29c321eb 100644 --- a/catalyst-gateway/bin/src/db/index/session.rs +++ b/catalyst-gateway/bin/src/db/index/session.rs @@ -16,7 +16,10 @@ use tokio::fs; use tracing::{error, info}; use super::{ - queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery}, + queries::{ + FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery, + PreparedUpsertQuery, + }, schema::create_schema, }; use crate::{ @@ -132,6 +135,22 @@ impl CassandraSession { queries.execute_batch(session, cfg, query, values).await } + + /// Execute a query which returns no results, except an error if it fails. + /// Can not be batched, takes a single set of parameters. + pub(crate) async fn execute_upsert( + &self, query: PreparedUpsertQuery, value: T, + ) -> anyhow::Result<()> { + let session = self.session.clone(); + let queries = self.queries.clone(); + + queries.execute_upsert(session, query, value).await + } + + /// Get underlying Raw Cassandra Session. + pub(crate) fn get_raw_session(&self) -> Arc { + self.session.clone() + } } /// Create a new execution profile based on the given configuration. diff --git a/catalyst-gateway/bin/src/service/api/cardano/staked_ada_get.rs b/catalyst-gateway/bin/src/service/api/cardano/staked_ada_get.rs index 5ad8f1fef2..e729f127f4 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/staked_ada_get.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/staked_ada_get.rs @@ -103,6 +103,9 @@ async fn calculate_stake_info( } check_and_set_spent(&session, &mut txos_by_txn).await?; + // TODO: This could be executed in the background, it does not actually matter if it + // succeeds. This is just an optimization step to reduce the need to query spent + // TXO's. update_spent(&session, stake_address_bytes, &txos_by_txn).await?; let stake_info = build_stake_info(txos_by_txn)?; diff --git a/catalyst-gateway/bin/src/settings/chain_follower.rs b/catalyst-gateway/bin/src/settings/chain_follower.rs index 9311f71cbc..2159e31681 100644 --- a/catalyst-gateway/bin/src/settings/chain_follower.rs +++ b/catalyst-gateway/bin/src/settings/chain_follower.rs @@ -16,6 +16,15 @@ const DEFAULT_SYNC_TASKS: u16 = 16; /// Maximum number of sync tasks (must be in the range 1 to 256 inclusive.) const MAX_SYNC_TASKS: u16 = 256; +/// Default number of slots each sync task will process at one time. +/// This default is just over one week worth of data where 1 slot == 1 second. +const DEFAULT_SYNC_MAX_SLOTS: u64 = 700_000; +/// Minimum the number of slots each sync task will process at one time can be set to. +/// Note: This is just the setting minimum, a sync task may sync as few as a 1 slot. +const MIN_SYNC_MAX_SLOTS: u64 = 10_000; +/// Maximum the number of slots each sync task will process at one time can be set to. +const MAX_SYNC_MAX_SLOTS: u64 = 100_000_000; + /// Maximum number of DL Connections (must be in the range 1 to 256 inclusive.) const MAX_DL_CONNECTIONS: usize = 256; @@ -40,6 +49,9 @@ pub(crate) struct EnvVars { /// The maximum number of sync tasks. pub(crate) sync_tasks: u16, + /// The maximum number of slots a sync task will process at once. + pub(crate) sync_chunk_max_slots: u64, + /// The Mithril Downloader Configuration. pub(crate) dl_config: DlConfig, } @@ -55,6 +67,13 @@ impl EnvVars { MAX_SYNC_TASKS, ); + let sync_slots: u64 = StringEnvVar::new_as( + "CHAIN_FOLLOWER_SYNC_MAX_SLOTS", + DEFAULT_SYNC_MAX_SLOTS, + MIN_SYNC_MAX_SLOTS, + MAX_SYNC_MAX_SLOTS, + ); + let cfg = ChainSyncConfig::default_for(chain); let mut dl_config = cfg.mithril_cfg.dl_config.clone().unwrap_or_default(); @@ -119,6 +138,7 @@ impl EnvVars { Self { chain, sync_tasks, + sync_chunk_max_slots: sync_slots, dl_config, } } diff --git a/catalyst-gateway/event-db/Earthfile b/catalyst-gateway/event-db/Earthfile index 1d35538d73..66ccff08b9 100644 --- a/catalyst-gateway/event-db/Earthfile +++ b/catalyst-gateway/event-db/Earthfile @@ -3,7 +3,7 @@ # the database and its associated software. VERSION 0.8 -IMPORT github.com/input-output-hk/catalyst-ci/earthly/postgresql:v3.2.10 AS postgresql-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/postgresql:v3.2.14 AS postgresql-ci # cspell: words diff --git a/catalyst-gateway/tests/Earthfile b/catalyst-gateway/tests/Earthfile index 5c12cc9fa0..e902d93768 100644 --- a/catalyst-gateway/tests/Earthfile +++ b/catalyst-gateway/tests/Earthfile @@ -1,5 +1,5 @@ VERSION 0.8 -IMPORT github.com/input-output-hk/catalyst-ci/earthly/spectral:v3.2.10 AS spectral-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/spectral:v3.2.14 AS spectral-ci # test-lint-openapi - OpenAPI linting from an artifact # testing whether the OpenAPI generated during build stage follows good practice. diff --git a/catalyst-gateway/tests/api_tests/Earthfile b/catalyst-gateway/tests/api_tests/Earthfile index ebbb339d6e..aa9bd062f8 100644 --- a/catalyst-gateway/tests/api_tests/Earthfile +++ b/catalyst-gateway/tests/api_tests/Earthfile @@ -1,6 +1,6 @@ VERSION 0.8 -IMPORT github.com/input-output-hk/catalyst-ci/earthly/python:v3.2.10 AS python-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/python:v3.2.14 AS python-ci builder: FROM python-ci+python-base diff --git a/catalyst_voices/Earthfile b/catalyst_voices/Earthfile index 0339877018..31ec489635 100644 --- a/catalyst_voices/Earthfile +++ b/catalyst_voices/Earthfile @@ -1,7 +1,7 @@ VERSION 0.8 IMPORT ../catalyst-gateway AS catalyst-gateway -IMPORT github.com/input-output-hk/catalyst-ci/earthly/flutter:v3.2.10 AS flutter-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/flutter:v3.2.14 AS flutter-ci # Copy all the necessary files and running bootstrap builder: diff --git a/catalyst_voices/uikit_example/Earthfile b/catalyst_voices/uikit_example/Earthfile index eed673c6ce..8f3e3cdcbe 100644 --- a/catalyst_voices/uikit_example/Earthfile +++ b/catalyst_voices/uikit_example/Earthfile @@ -1,7 +1,7 @@ VERSION 0.8 IMPORT ../ AS catalyst-voices -IMPORT github.com/input-output-hk/catalyst-ci/earthly/flutter:v3.2.10 AS flutter-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/flutter:v3.2.14 AS flutter-ci # local-build-web - build web version of UIKit example. # Prefixed by "local" to make sure it's not auto triggered, the target was diff --git a/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/wallet-automation/Earthfile b/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/wallet-automation/Earthfile index 3ae373157a..bad2748bd8 100644 --- a/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/wallet-automation/Earthfile +++ b/catalyst_voices_packages/catalyst_cardano/catalyst_cardano/wallet-automation/Earthfile @@ -1,6 +1,6 @@ VERSION 0.8 -IMPORT github.com/input-output-hk/catalyst-ci/earthly/flutter:v3.2.10 AS flutter-ci -IMPORT github.com/input-output-hk/catalyst-ci/earthly/playwright:v3.2.10 AS playwright-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/flutter:v3.2.14 AS flutter-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/playwright:v3.2.14 AS playwright-ci deps: DO playwright-ci+SETUP --workdir=/wallet-automation diff --git a/docs/Earthfile b/docs/Earthfile index a027fcdece..a042410367 100644 --- a/docs/Earthfile +++ b/docs/Earthfile @@ -1,6 +1,6 @@ VERSION 0.8 -IMPORT github.com/input-output-hk/catalyst-ci/earthly/docs:v3.2.10 AS docs-ci +IMPORT github.com/input-output-hk/catalyst-ci/earthly/docs:v3.2.14 AS docs-ci IMPORT .. AS repo IMPORT ../catalyst-gateway AS catalyst-gateway diff --git a/utilities/local-scylla/justfile b/utilities/local-scylla/justfile index 2cbf82e207..7d215b2cb3 100644 --- a/utilities/local-scylla/justfile +++ b/utilities/local-scylla/justfile @@ -36,7 +36,7 @@ scylla-dev-db-reset-cluster: scylla-dev-db-purge scylla-dev-db-cluster # Run CQLSH on the dev Scylla cluster scylla-dev-db-cqlsh: - docker run --rm -it scylladb/scylla-cqlsh `hostname` 9043 + docker run --rm -it scylladb/scylla-cqlsh "{{host_ip}}" 9042 # Run Nodetool on the dev Scylla cluster to dump status info. scylla-dev-db-nodetool: