From b4c4d50fb30d984af69bd6cae816f488eee70738 Mon Sep 17 00:00:00 2001 From: Brooke Hamilton <45323234+brooke-hamilton@users.noreply.github.com> Date: Fri, 27 Sep 2024 03:33:48 -0400 Subject: [PATCH 1/2] Update the bicep extension used in the dev container (#7969) # Description Updated the Bicep VS Code extension in the dev container to use the current version of the extension instead of the previous Radius Bicep extension. ## Type of change - This pull request is a minor refactor, code cleanup, test improvement, or other maintenance task and doesn't change the functionality of Radius (issue link optional). Fixes: N/A Signed-off-by: Brooke Hamilton <45323234+brooke-hamilton@users.noreply.github.com> --- .devcontainer/contributor/devcontainer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.devcontainer/contributor/devcontainer.json b/.devcontainer/contributor/devcontainer.json index 4809ae6f2d..c7aff43310 100644 --- a/.devcontainer/contributor/devcontainer.json +++ b/.devcontainer/contributor/devcontainer.json @@ -34,7 +34,7 @@ "golang.go", "ms-python.python", "ms-python.vscode-pylance", - "ms-azuretools.rad-vscode-bicep", + "ms-azuretools.vscode-bicep", "ms-kubernetes-tools.vscode-kubernetes-tools", "ms-azuretools.vscode-dapr", "ms-vscode.makefile-tools" From 223ec1ee1c22efe8b8c47c3046f389d0b6b83dc1 Mon Sep 17 00:00:00 2001 From: Ryan Nowak Date: Fri, 27 Sep 2024 12:50:03 -0700 Subject: [PATCH 2/2] Implement PostgreSQL datastore (#7961) # Description This change implements our datastore abstraction using PostgreSQL. This is an easier and better option for users to manage than our current set of datastores. The next pull-request in this series will hook it up to Radius so we can start using it. Ultimately we can replace the current default of Kubernetes CRD-store for both dev and deployed configurations in a few steps. Also included in this pull-request is a set of new makefile commands (`make db-*`) to make it easier to work with the local database. As always run `make` to see the command-line help. ## Type of change - This pull request is a minor refactor, code cleanup, test improvement, or other maintenance task and doesn't change the functionality of Radius (issue link optional). Signed-off-by: Ryan Nowak --- Makefile | 2 +- build/db.mk | 117 +++++ deploy/init-db/db.sql.txt | 66 +++ deploy/init-db/init-db.sh | 23 + go.mod | 16 +- go.sum | 30 +- pkg/ucp/dataprovider/factory.go | 34 +- pkg/ucp/dataprovider/options.go | 17 + pkg/ucp/dataprovider/types.go | 3 + pkg/ucp/store/inmemory/client.go | 24 +- pkg/ucp/store/postgres/postgresclient.go | 422 ++++++++++++++++++ pkg/ucp/store/postgres/postgresclient_test.go | 87 ++++ pkg/ucp/store/storeutil/id.go | 10 +- pkg/ucp/store/storeutil/id_test.go | 8 +- 14 files changed, 813 insertions(+), 46 deletions(-) create mode 100644 build/db.mk create mode 100644 deploy/init-db/db.sql.txt create mode 100755 deploy/init-db/init-db.sh create mode 100644 pkg/ucp/store/postgres/postgresclient.go create mode 100644 pkg/ucp/store/postgres/postgresclient_test.go diff --git a/Makefile b/Makefile index 36968036aa..9a69757f0f 100644 --- a/Makefile +++ b/Makefile @@ -17,4 +17,4 @@ ARROW := \033[34;1m=>\033[0m # order matters for these -include build/help.mk build/version.mk build/build.mk build/util.mk build/generate.mk build/test.mk build/docker.mk build/recipes.mk build/install.mk build/debug.mk +include build/help.mk build/version.mk build/build.mk build/util.mk build/generate.mk build/test.mk build/docker.mk build/recipes.mk build/install.mk build/db.mk build/debug.mk diff --git a/build/db.mk b/build/db.mk new file mode 100644 index 0000000000..9707e746fe --- /dev/null +++ b/build/db.mk @@ -0,0 +1,117 @@ +# ------------------------------------------------------------ +# Copyright 2023 The Radius Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ------------------------------------------------------------ + +POSTGRES_USER=postgres +POSTGRES_PASSWORD=ucprulz! +POSTGRES_IMAGE=ghcr.io/radius-project/mirror/postgres:latest +POSTGRES_CONTAINER_NAME=radius-postgres + +##@ Database + +.PHONY: db-init +db-init: db-dependency-docker-running ## Initialize a local PostgresSQL database for testing + @echo "$(ARROW) Initializing local PostgresSQL database" + @if [ "$$( docker container inspect -f '{{.State.Running}}' $(POSTGRES_CONTAINER_NAME) 2> /dev/null)" = "true" ]; then \ + echo "PostgresSQL container $(POSTGRES_CONTAINER_NAME) is already runnning"; \ + elif [ "$$( docker container inspect -f '{{.State.Running}}' $(POSTGRES_CONTAINER_NAME) 2> /dev/null)" = "false" ]; then \ + echo "PostgresSQL container $(POSTGRES_CONTAINER_NAME) is not running"; \ + echo "This might have been a crash"; \ + echo ""; \ + docker logs $(POSTGRES_CONTAINER_NAME); \ + echo ""; \ + echo "Restarting PostgresSQL container $(POSTGRES_CONTAINER_NAME)" \ + docker start $(POSTGRES_CONTAINER_NAME) 1> /dev/null; \ + else \ + docker run \ + --detach \ + --name $(POSTGRES_CONTAINER_NAME) \ + --publish 5432:5432 \ + --env POSTGRES_USER=$(POSTGRES_USER) \ + --env POSTGRES_PASSWORD=$(POSTGRES_PASSWORD) \ + --volume $(PWD)/deploy/init-db/:/docker-entrypoint-initdb.d/ \ + $(POSTGRES_IMAGE) 1> /dev/null; \ + echo "Started PostgresSQL container $(POSTGRES_CONTAINER_NAME)"; \ + fi; + @echo "" + @echo "Use PostgreSQL in tests:" + @echo "" + @echo "export TEST_POSTGRES_URL=postgresql://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@localhost:5432/ucp" + @echo "" + @echo "Makefile cheatsheet:" + @echo " - Stop the database $(ARROW) make db-stop" + @echo " - Reset the database $(ARROW) make db-reset" + @echo " - Logs $(ARROW) docker logs $(POSTGRES_CONTAINER_NAME)" + @echo " - Connect to the database server: make db-shell" + @echo " - Shell tip: Connect to UCP database $(ARROW) \\\\c ucp" + @echo " - Shell tip: Connect to applications_rp database $(ARROW) \\\\c applications_rp" + @echo " - Shell tip: List resources $(ARROW) select * from resources;" + +.PHONY: db-stop +db-stop: db-dependency-docker-running ## Stop the local PostgresSQL database + @echo "$(ARROW) Stopping local PostgresSQL database..." + @if [ "$$( docker container inspect -f '{{.State.Running}}' $(POSTGRES_CONTAINER_NAME) 2> /dev/null)" = "true" ]; then \ + docker stop $(POSTGRES_CONTAINER_NAME) 1> /dev/null; \ + else \ + echo "PostgresSQL container $(POSTGRES_CONTAINER_NAME) is not running"; \ + fi; + +.PHONY: db-shell +db-shell: db-postgres-running ## Open a shell to the local PostgresSQL database + @echo "$(ARROW) Connecting to local PostgresSQL database..." + @DOCKER_CLI_HINTS=false docker exec \ + --interactive \ + --tty \ + $(POSTGRES_CONTAINER_NAME) \ + psql \ + "postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@localhost:5432" + +.PHONY: db-reset +db-reset: db-postgres-running ## Reset the local PostgresSQL database + @echo "$(ARROW) Resetting local PostgresSQL database" + @echo "" + @echo "Resetting ucp resources..." + @DOCKER_CLI_HINTS=false docker exec \ + --interactive \ + --tty \ + $(POSTGRES_CONTAINER_NAME) \ + psql \ + "postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@localhost:5432/ucp" \ + --command "DELETE FROM resources;" + @echo "" + @echo "Resetting applications_rp resources..." + @DOCKER_CLI_HINTS=false docker exec \ + --interactive \ + --tty \ + $(POSTGRES_CONTAINER_NAME) \ + psql \ + "postgres://$(POSTGRES_USER):$(POSTGRES_PASSWORD)@localhost:5432/applications_rp" \ + --command "DELETE FROM resources;" + +.PHONY: db-dependency-docker-running +db-dependency-docker-running: + @if [ ! docker info > /dev/null 2>&1 ]; then \ + echo "Docker is not installed or not running. Please install docker and try again."; \ + exit 1; \ + fi; + +.PHONY: db-postgres-running +db-postgres-running: db-dependency-docker-running + @if [ "$$( docker container inspect -f '{{.State.Running}}' $(POSTGRES_CONTAINER_NAME) 2> /dev/null)" = "true" ]; then \ + exit 0; \ + else \ + echo "PostgresSQL container $(POSTGRES_CONTAINER_NAME) is not running"; \ + exit 1; \ + fi; \ No newline at end of file diff --git a/deploy/init-db/db.sql.txt b/deploy/init-db/db.sql.txt new file mode 100644 index 0000000000..c76e71421e --- /dev/null +++ b/deploy/init-db/db.sql.txt @@ -0,0 +1,66 @@ +-- 'resources' is used to store all of our resources. See comments below for an explanation of the columns. +CREATE TABLE resources ( + -- example: "/planes/radius/local/resourceGroups/rg1/providers/Applications.Core/applications/my-app" + -- + -- We use columns to break out the most important components of the resource id for optimal querying. + -- + -- Since resource ids are case-insensitive we canonicalize these columns to lowercase. + -- We store the original resource id with the original casing so users can work with their preferred + -- naming/casing conventions. + -- + -- We ensure a leading and trailing slash on the components of the resource id for ease of comparison. + -- + -- id -> "/planes/radius/local/resourcegroups/rg1/providers/applications.core/applications/my-app/" + -- resource_type -> "/applications.core/applications/" + -- root_scope -> "/planes/radius/local/resourcegroups/rg1/" + -- routing_scope -> "/applications.core/applications/my-app/" + + -- resource id used as key. + id TEXT PRIMARY KEY NOT NULL, + + -- original_id is used to store the original id of the resource before any normalization occurs. + -- This is provided for compatability with the existing design of the store API, and can be removed + -- in the future. + original_id TEXT NOT NULL, + + -- resource type by queries to filter by type. + resource_type TEXT NOT NULL, + + -- root_scope used by queries to list resources by their scope. + root_scope TEXT NOT NULL, + + -- routing_scope used by queries to list resources when they are child resources. + routing_scope TEXT NOT NULL, + + -- etag used for optimistic concurrency control. + etag TEXT NOT NULL, + + -- timestamp is used to implement cursor-based pagination (see below). + created_at TIMESTAMP (6) WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + + -- resource_data stores the resource data. + resource_data JSONB NOT NULL +); + +-- idx_resource_query is an index for improving performance of queries. +-- +-- Queries always list resources by their: +-- - resource_type, and root_scope OR +-- - resource_type, root_scope, and (LIKE) routing_scope +-- +-- eg: "find all Applications.Core/applications resources in /planes/radius/local/resourceGroups/my-rg" +-- +-- > "resource_type" = "/applications.core/applications/" +-- > "root_scope" = "/planes/radius/local/resourcegroups/my-rg" +-- > "routing_scope" = NULL +-- +-- 'created_at' is used with ORDER BY to sort the output, so we can implement cursor-based pagination. +-- +-- 1) For the initial query, we won't specify a cursor value. +-- 2) For the next query, we will specify the cursor value as the last created_at value from the previous +-- query, which allows us to skip the records that were already returned. +-- +-- The index only contains resource_type and root_scope because these are usually specified exactly. +-- We don't really benefit from routing_scope being in the index because it's always used with LIKE. +-- We don't benefit from created_at being in the index because it's used for sorting. +CREATE INDEX idx_resource_query ON resources (resource_type, root_scope); diff --git a/deploy/init-db/init-db.sh b/deploy/init-db/init-db.sh new file mode 100755 index 0000000000..29df96d9df --- /dev/null +++ b/deploy/init-db/init-db.sh @@ -0,0 +1,23 @@ +#!/bin/bash +set -e + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +# Array of usernames +RESOURCE_PROVIDERS=("ucp" "applications_rp") + +# Create databases and users +for RESOURCE_PROVIDER in "${RESOURCE_PROVIDERS[@]}"; do + echo "Creating database and user for $RESOURCE_PROVIDER" + psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL + CREATE USER $RESOURCE_PROVIDER WITH PASSWORD '$POSTGRES_PASSWORD'; + CREATE DATABASE $RESOURCE_PROVIDER; + GRANT ALL PRIVILEGES ON DATABASE $RESOURCE_PROVIDER TO $RESOURCE_PROVIDER; +EOSQL +done + +# Create tables within those databases +for RESOURCE_PROVIDER in "${RESOURCE_PROVIDERS[@]}"; do + echo "Creating tables in database $RESOURCE_PROVIDER" + psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$RESOURCE_PROVIDER" < $SCRIPT_DIR/db.sql.txt +done \ No newline at end of file diff --git a/go.mod b/go.mod index a99fdf4af7..4e6a0a5132 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/cloudcontrol v1.20.5 github.com/aws/aws-sdk-go-v2/service/cloudformation v1.53.5 github.com/aws/aws-sdk-go-v2/service/ec2 v1.177.0 + github.com/aws/aws-sdk-go-v2/service/ecr v1.32.4 github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 github.com/aws/smithy-go v1.20.4 github.com/charmbracelet/bubbles v0.19.0 @@ -50,6 +51,7 @@ require ( github.com/hashicorp/hc-install v0.8.0 github.com/hashicorp/terraform-config-inspect v0.0.0-20240607080351-271db412dbcb github.com/hashicorp/terraform-exec v0.21.0 + github.com/jackc/pgx/v5 v5.7.1 github.com/mattn/go-isatty v0.0.20 github.com/mitchellh/mapstructure v1.5.0 github.com/novln/docker-parser v1.0.0 @@ -80,7 +82,7 @@ require ( go.uber.org/mock v0.4.0 go.uber.org/zap v1.27.0 golang.org/x/sync v0.8.0 - golang.org/x/text v0.17.0 + golang.org/x/text v0.18.0 gopkg.in/yaml.v3 v3.0.1 helm.sh/helm/v3 v3.15.4 k8s.io/api v0.30.3 @@ -106,7 +108,6 @@ require ( github.com/Microsoft/hcsshim v0.12.4 // indirect github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/aws/aws-sdk-go v1.54.6 // indirect - github.com/aws/aws-sdk-go-v2/service/ecr v1.32.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect github.com/blang/semver/v4 v4.0.0 // indirect @@ -130,6 +131,9 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.5 // indirect github.com/hashicorp/go-safetemp v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -191,7 +195,7 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/cyphar/filepath-securejoin v0.2.5 // indirect - github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/docker/cli v26.1.4+incompatible // indirect github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/docker v27.1.1+incompatible // indirect @@ -309,13 +313,13 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.starlark.net v0.0.0-20240520160348-046347dcd104 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.25.0 // indirect + golang.org/x/crypto v0.27.0 // indirect golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 golang.org/x/mod v0.19.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sys v0.24.0 // indirect - golang.org/x/term v0.22.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/term v0.24.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240624140628-dc46fd24d27d // indirect diff --git a/go.sum b/go.sum index ef7c1d1224..2ff9af7451 100644 --- a/go.sum +++ b/go.sum @@ -281,8 +281,6 @@ github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.54.6 h1:HEYUib3yTt8E6vxjMWM3yAq5b+qjj/6aKA62mkgux9g= github.com/aws/aws-sdk-go v1.54.6/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= -github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= -github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= github.com/aws/aws-sdk-go-v2 v1.30.5 h1:mWSRTwQAb0aLE17dSzztCVJWI9+cRMgqebndjwDyK0g= github.com/aws/aws-sdk-go-v2 v1.30.5/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= github.com/aws/aws-sdk-go-v2/config v1.27.31 h1:kxBoRsjhT3pq0cKthgj6RU6bXTm/2SgdoUMyrVw0rAI= @@ -291,12 +289,8 @@ github.com/aws/aws-sdk-go-v2/credentials v1.17.30 h1:aau/oYFtibVovr2rDt8FHlU17BT github.com/aws/aws-sdk-go-v2/credentials v1.17.30/go.mod h1:BPJ/yXV92ZVq6G8uYvbU0gSl8q94UB63nMT5ctNO38g= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 h1:yjwoSyDZF8Jth+mUk5lSPJCkMC0lMy6FaCD51jm6ayE= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12/go.mod h1:fuR57fAgMk7ot3WcNQfb6rSEn+SUffl7ri+aa8uKysI= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17 h1:pI7Bzt0BJtYA0N/JEC6B8fJ4RBrEMi1LBrkMdFYNSnQ= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.17/go.mod h1:Dh5zzJYMtxfIjYW+/evjQ8uj2OyR/ve2KROHGHlSFqE= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17 h1:Mqr/V5gvrhA2gvgnF42Zh5iMiQNcOYthFYwCyrnuWlc= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.17/go.mod h1:aLJpZlCmjE+V+KtN1q1uyZkfnUWpQGpbsn89XPKyzfU= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= @@ -736,6 +730,14 @@ github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -1106,8 +1108,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= -golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= -golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1321,14 +1323,14 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= -golang.org/x/term v0.22.0 h1:BbsgPEJULsl2fV/AT3v15Mjva5yXKQDyKf+TbDz7QJk= -golang.org/x/term v0.22.0/go.mod h1:F3qCibpT5AMpCRfhfT53vVJwhLtIVHhB9XDjfFvnMI4= +golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= +golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1339,8 +1341,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/pkg/ucp/dataprovider/factory.go b/pkg/ucp/dataprovider/factory.go index 8c7a867a68..e7026b6b6b 100644 --- a/pkg/ucp/dataprovider/factory.go +++ b/pkg/ucp/dataprovider/factory.go @@ -20,7 +20,9 @@ import ( context "context" "errors" "fmt" + "regexp" + "github.com/jackc/pgx/v5/pgxpool" "github.com/radius-project/radius/pkg/kubeutil" store "github.com/radius-project/radius/pkg/ucp/store" "github.com/radius-project/radius/pkg/ucp/store/apiserverstore" @@ -28,6 +30,7 @@ import ( "github.com/radius-project/radius/pkg/ucp/store/cosmosdb" "github.com/radius-project/radius/pkg/ucp/store/etcdstore" "github.com/radius-project/radius/pkg/ucp/store/inmemory" + "github.com/radius-project/radius/pkg/ucp/store/postgres" "k8s.io/apimachinery/pkg/runtime" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -36,10 +39,11 @@ import ( type storageFactoryFunc func(context.Context, StorageProviderOptions, string) (store.StorageClient, error) var storageClientFactory = map[StorageProviderType]storageFactoryFunc{ - TypeAPIServer: initAPIServerClient, - TypeCosmosDB: initCosmosDBClient, - TypeETCD: InitETCDClient, - TypeInMemory: initInMemoryClient, + TypeAPIServer: initAPIServerClient, + TypeCosmosDB: initCosmosDBClient, + TypeETCD: InitETCDClient, + TypeInMemory: initInMemoryClient, + TypePostgreSQL: initPostgreSQLClient, } func initAPIServerClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) { @@ -124,3 +128,25 @@ func InitETCDClient(ctx context.Context, opt StorageProviderOptions, _ string) ( func initInMemoryClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) { return inmemory.NewClient(), nil } + +// initPostgreSQLClient creates a new PostgreSQL store client. +func initPostgreSQLClient(ctx context.Context, opt StorageProviderOptions, _ string) (store.StorageClient, error) { + if opt.PostgreSQL.URL == "" { + return nil, errors.New("failed to initialize PostgreSQL client: URL is required") + } + + url := opt.PostgreSQL.URL + regex := regexp.MustCompile(`$\{([a-zA-Z_]+)\}`) + matches := regex.FindSubmatch([]byte(opt.PostgreSQL.URL)) + if len(matches) > 1 { + // Extract the captured expression. + url = string(matches[1]) + } + + pool, err := pgxpool.New(ctx, url) + if err != nil { + return nil, fmt.Errorf("failed to initialize PostgreSQL client: %w", err) + } + + return postgres.NewPostgresClient(pool), nil +} diff --git a/pkg/ucp/dataprovider/options.go b/pkg/ucp/dataprovider/options.go index 4f18f9bd0f..95f99d323b 100644 --- a/pkg/ucp/dataprovider/options.go +++ b/pkg/ucp/dataprovider/options.go @@ -37,6 +37,9 @@ type StorageProviderOptions struct { // InMemory configures options for the in-memory store. Will be ignored if another store is configured. InMemory InMemoryOptions `yaml:"inmemory,omitempty"` + + // PostgreSQL configures options for connecting to a PostgreSQL database. Will be ignored if another store is configured. + PostgreSQL PostgreSQLOptions `yaml:"postgresql,omitempty"` } // APIServerOptions represents options for the configuring the Kubernetes APIServer store. @@ -71,3 +74,17 @@ type ETCDOptions struct { // InMemoryOptions represents options for the in-memory store. type InMemoryOptions struct{} + +// PostgreSQLOptions represents options for the PostgreSQL store. +type PostgreSQLOptions struct { + // URL is the connection information for the PostgreSQL database in URL format. + // + // The URL should be formatted according to: + // https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING-URIS + // + // The URL can contain secrets like passwords so it must be treated as sensitive. + // + // In place of the actual URL, you can substitute an environment variable by using the format: + // ${ENV_VAR_NAME} + URL string `yaml:"url"` +} diff --git a/pkg/ucp/dataprovider/types.go b/pkg/ucp/dataprovider/types.go index e113b12726..543ef8dfd8 100644 --- a/pkg/ucp/dataprovider/types.go +++ b/pkg/ucp/dataprovider/types.go @@ -37,6 +37,9 @@ const ( // TypeInMemory represents the in-memory provider. TypeInMemory StorageProviderType = "inmemory" + + // TypePostgreSQL represents the PostgreSQL provider. + TypePostgreSQL StorageProviderType = "postgresql" ) //go:generate mockgen -typed -destination=./mock_datastorage_provider.go -package=dataprovider -self_package github.com/radius-project/radius/pkg/ucp/dataprovider github.com/radius-project/radius/pkg/ucp/dataprovider DataStorageProvider diff --git a/pkg/ucp/store/inmemory/client.go b/pkg/ucp/store/inmemory/client.go index 17776e73dd..97adc07337 100644 --- a/pkg/ucp/store/inmemory/client.go +++ b/pkg/ucp/store/inmemory/client.go @@ -103,7 +103,7 @@ func (c *Client) Get(ctx context.Context, id string, options ...store.GetOptions return nil, &store.ErrInvalid{Message: "invalid argument. 'id' must refer to a named resource, not a collection"} } - normalized, err := storeutil.NormalizeResourceID(parsed) + converted, err := storeutil.ConvertScopeIDToResourceID(parsed) if err != nil { return nil, err } @@ -111,7 +111,7 @@ func (c *Client) Get(ctx context.Context, id string, options ...store.GetOptions c.mutex.Lock() defer c.mutex.Unlock() - entry, ok := c.resources[strings.ToLower(normalized.String())] + entry, ok := c.resources[strings.ToLower(converted.String())] if !ok { return nil, &store.ErrNotFound{ID: id} } @@ -141,7 +141,7 @@ func (c *Client) Delete(ctx context.Context, id string, options ...store.DeleteO return &store.ErrInvalid{Message: "invalid argument. 'id' must refer to a named resource, not a collection"} } - normalized, err := storeutil.NormalizeResourceID(parsed) + converted, err := storeutil.ConvertScopeIDToResourceID(parsed) if err != nil { return err } @@ -151,7 +151,7 @@ func (c *Client) Delete(ctx context.Context, id string, options ...store.DeleteO config := store.NewDeleteConfig(options...) - entry, ok := c.resources[strings.ToLower(normalized.String())] + entry, ok := c.resources[strings.ToLower(converted.String())] if !ok && config.ETag != "" { return &store.ErrConcurrency{} } else if !ok { @@ -160,7 +160,7 @@ func (c *Client) Delete(ctx context.Context, id string, options ...store.DeleteO return &store.ErrConcurrency{} } - delete(c.resources, strings.ToLower(normalized.String())) + delete(c.resources, strings.ToLower(converted.String())) return nil } @@ -189,7 +189,7 @@ func (c *Client) Query(ctx context.Context, query store.Query, options ...store. } // Check resource type. - resourceType, err := storeutil.NormalizeResourceType(query.ResourceType) + resourceType, err := storeutil.ConvertScopeTypeToResourceType(query.ResourceType) if err != nil { return nil, err } @@ -237,7 +237,7 @@ func (c *Client) Save(ctx context.Context, obj *store.Object, options ...store.S return &store.ErrInvalid{Message: "invalid argument. 'obj.ID' must be a valid resource id"} } - normalized, err := storeutil.NormalizeResourceID(parsed) + converted, err := storeutil.ConvertScopeIDToResourceID(parsed) if err != nil { return err } @@ -247,16 +247,16 @@ func (c *Client) Save(ctx context.Context, obj *store.Object, options ...store.S config := store.NewSaveConfig(options...) - entry, ok := c.resources[strings.ToLower(normalized.String())] + entry, ok := c.resources[strings.ToLower(converted.String())] if !ok && config.ETag != "" { return &store.ErrConcurrency{} } else if ok && config.ETag != "" && config.ETag != entry.obj.ETag { return &store.ErrConcurrency{} } else if !ok { // New entry, initialize it. - entry.rootScope = storeutil.NormalizePart(normalized.RootScope()) - entry.resourceType = storeutil.NormalizePart(normalized.Type()) - entry.routingScope = storeutil.NormalizePart(normalized.RoutingScope()) + entry.rootScope = storeutil.NormalizePart(converted.RootScope()) + entry.resourceType = storeutil.NormalizePart(converted.Type()) + entry.routingScope = storeutil.NormalizePart(converted.RoutingScope()) } raw, err := json.Marshal(obj.Data) @@ -275,7 +275,7 @@ func (c *Client) Save(ctx context.Context, obj *store.Object, options ...store.S entry.obj = *copy - c.resources[strings.ToLower(normalized.String())] = entry + c.resources[strings.ToLower(converted.String())] = entry return nil } diff --git a/pkg/ucp/store/postgres/postgresclient.go b/pkg/ucp/store/postgres/postgresclient.go new file mode 100644 index 0000000000..bdab94f031 --- /dev/null +++ b/pkg/ucp/store/postgres/postgresclient.go @@ -0,0 +1,422 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package postgres + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/radius-project/radius/pkg/to" + "github.com/radius-project/radius/pkg/ucp/resources" + "github.com/radius-project/radius/pkg/ucp/store" + "github.com/radius-project/radius/pkg/ucp/store/storeutil" + "github.com/radius-project/radius/pkg/ucp/util/etag" +) + +// PostgresAPI defines the API surface from pgx that we use. This is used to allow for easier testing. +// +// Keep these definitions in sync with pgxpool.Pool and pgx.Conn. +type PostgresAPI interface { + // Exec executes a query without returning any rows. + Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) + // QueryRow executes a query that is expected to return at most one row. + QueryRow(ctx context.Context, sql string, args ...any) pgx.Row + // Query executes a query that returns rows. + Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) +} + +// NewPostgresClient creates a new PostgresClient. +func NewPostgresClient(api PostgresAPI) *PostgresClient { + return &PostgresClient{api: api} +} + +var _ store.StorageClient = (*PostgresClient)(nil) + +// PostgresClient is a storage client that uses Postgres as the backend. +type PostgresClient struct { + api PostgresAPI +} + +// Delete implements store.StorageClient. +func (p *PostgresClient) Delete(ctx context.Context, id string, options ...store.DeleteOptions) error { + if ctx == nil { + return &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"} + } + + parsed, err := resources.Parse(id) + if err != nil { + return &store.ErrInvalid{Message: "invalid argument. 'id' must be a valid resource id"} + } + if parsed.IsEmpty() { + return &store.ErrInvalid{Message: "invalid argument. 'id' must not be empty"} + } + if parsed.IsResourceCollection() || parsed.IsScopeCollection() { + return &store.ErrInvalid{Message: "invalid argument. 'id' must refer to a named resource, not a collection"} + } + + converted, err := storeutil.ConvertScopeIDToResourceID(parsed) + if err != nil { + return err + } + + config := store.NewDeleteConfig(options...) + var etag *string + if config.ETag != "" { + etag = &config.ETag + } + + // We need different SQL for the case where an etag is provided vs not provided. + // + // The key behavior difference is that if an etag is provided, should report failure differently. + sql := ` +WITH deleted AS ( + DELETE FROM resources + WHERE id = $1 + RETURNING id +) +SELECT +CASE + WHEN EXISTS (SELECT 1 FROM deleted) THEN 'Success' + WHEN EXISTS (SELECT 1 FROM resources WHERE id = $1) THEN 'ErrConcurrency' + ELSE 'ErrNotFound' +END AS result;` + + args := []any{storeutil.NormalizePart(converted.String())} + + if config.ETag != "" { + // NOTE: we want to report ErrConcurrency for all failure cases here. This is what the tests do. + sql = ` +WITH deleted AS ( + DELETE FROM resources + WHERE id = $1 AND etag = $2 + RETURNING id +) +SELECT +CASE + WHEN EXISTS (SELECT 1 FROM deleted) THEN 'Success' + WHEN EXISTS (SELECT 1 FROM resources WHERE id = $1) THEN 'ErrConcurrency' + ELSE 'ErrConcurrency' +END AS result;` + + args = []any{storeutil.NormalizePart(converted.String()), etag} + } + + result := "" + err = p.api.QueryRow(ctx, sql, args...).Scan(&result) + if err != nil { + return err + } else if result == "ErrNotFound" { + return &store.ErrNotFound{ID: id} + } else if result == "ErrConcurrency" { + return &store.ErrConcurrency{} + } + + return nil +} + +// Get implements store.StorageClient. +func (p *PostgresClient) Get(ctx context.Context, id string, options ...store.GetOptions) (*store.Object, error) { + if ctx == nil { + return nil, &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"} + } + + parsed, err := resources.Parse(id) + if err != nil { + return nil, &store.ErrInvalid{Message: "invalid argument. 'id' must be a valid resource id"} + } + if parsed.IsEmpty() { + return nil, &store.ErrInvalid{Message: "invalid argument. 'id' must not be empty"} + } + if parsed.IsResourceCollection() || parsed.IsScopeCollection() { + return nil, &store.ErrInvalid{Message: "invalid argument. 'id' must refer to a named resource, not a collection"} + } + + converted, err := storeutil.ConvertScopeIDToResourceID(parsed) + if err != nil { + return nil, err + } + + obj := store.Object{} + err = p.api.QueryRow( + ctx, + "SELECT original_id, etag, resource_data FROM resources WHERE id = $1", + storeutil.NormalizePart(converted.String())).Scan(&obj.ID, &obj.ETag, &obj.Data) + if errors.Is(err, pgx.ErrNoRows) { + return nil, &store.ErrNotFound{ID: id} + } else if err != nil { + return nil, err + } + + return &obj, nil +} + +// Query implements store.StorageClient. +func (p *PostgresClient) Query(ctx context.Context, query store.Query, options ...store.QueryOptions) (*store.ObjectQueryResult, error) { + if ctx == nil { + return nil, &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"} + } + + err := query.Validate() + if err != nil { + return nil, &store.ErrInvalid{Message: fmt.Sprintf("invalid argument. Query is invalid: %s", err.Error())} + } + + config := store.NewQueryConfig(options...) + + // For a scope query, we need to perform the same normalization as we do for other operations on scopes. + resourceType := storeutil.NormalizePart(query.ResourceType) + if query.IsScopeQuery { + var err error + resourceType, err = storeutil.ConvertScopeTypeToResourceType(query.ResourceType) + if err != nil { + return nil, err + } + + resourceType = storeutil.NormalizePart(resourceType) + } + + var routingScopePrefixFilter *string + if query.RoutingScopePrefix != "" { + routingScopePrefixFilter = to.Ptr(storeutil.NormalizePart(query.RoutingScopePrefix)) + } + + var timestampFilter *string + if config.PaginationToken != "" { + ts, err := p.parsePaginationToken(config.PaginationToken) + if err != nil { + return nil, &store.ErrInvalid{Message: "invalid argument. 'query.PaginationToken' is invalid."} + } + timestampFilter = &ts + } + + var limitFilter *int + if config.MaxQueryItemCount > 0 { + limitFilter = &config.MaxQueryItemCount + } + + // For a scope query, we need to perform the same normalization as we do for other operations on scopes. + if query.IsScopeQuery { + var err error + query.ResourceType, err = storeutil.ConvertScopeTypeToResourceType(query.ResourceType) + if err != nil { + return nil, err + } + } + + // NOTE: building SQL by concatenating strings is hard to do safely and should be avoided. + // If you need to work on this code MAKE SURE you use SQL parameters + // for any user input. + sql := ` +SELECT original_id, etag, resource_data, created_at +FROM resources +WHERE ((root_scope = $1) OR ($2 AND (root_scope LIKE $1 || '%'))) AND + resource_type = $3 AND + ((routing_scope LIKE $4 || '%') OR $4 IS NULL) AND + (created_at > $5::TIMESTAMP OR $5 IS NULL) +ORDER BY created_at ASC +LIMIT $6` + + args := []any{ + // If ScopeRecursive is false, the RootScope must match exactly. + // If ScopeRecursive is true, the RootScope must be a prefix of the stored RootScope. + storeutil.NormalizePart(query.RootScope), + query.ScopeRecursive, + resourceType, + routingScopePrefixFilter, // RoutingScopePrefix is optional and always treated as as prefix. + timestampFilter, // Optional for pagination. + limitFilter, // NOTE: Postgres allows LIMIT to be set with a NULL value to mean no limit. + } + + rows, err := p.api.Query(ctx, sql, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + // Capture the last timestamp so we can use it for pagination. + var timestamp *time.Time + + result := store.ObjectQueryResult{} + for rows.Next() { + obj := store.Object{} + err := rows.Scan(&obj.ID, &obj.ETag, &obj.Data, ×tamp) + if err != nil { + return nil, err + } + + // We could improve this by moving the filter logic to the SQL query. + // + // The problem is that the current filter logic is not well documented or tested, and + // we want to stay compatible with the existing implementation for now. + match, err := obj.MatchesFilters(query.Filters) + if err != nil { + return nil, err + } else if !match { + continue + } + + result.Items = append(result.Items, obj) + } + + err = rows.Err() + if err != nil { + return nil, err + } + + if len(result.Items) < config.MaxQueryItemCount && config.MaxQueryItemCount > 0 { + // No more rows, so no need for pagination. + return &result, nil + } + + if timestamp != nil { + // Will be empty if there were no rows. + token, err := p.createPaginationToken(*timestamp) + if err != nil { + return nil, err + } + result.PaginationToken = token + } + + return &result, nil +} + +// Save implements store.StorageClient. +func (p *PostgresClient) Save(ctx context.Context, obj *store.Object, options ...store.SaveOptions) error { + if ctx == nil { + return &store.ErrInvalid{Message: "invalid argument. 'ctx' is required"} + } + if obj == nil { + return &store.ErrInvalid{Message: "invalid argument. 'obj' is required"} + } + + parsed, err := resources.Parse(obj.ID) + if err != nil { + return &store.ErrInvalid{Message: "invalid argument. 'obj.ID' must be a valid resource id"} + } + if parsed.IsEmpty() { + return &store.ErrInvalid{Message: "invalid argument. 'obj.ID' must not be empty"} + } + if parsed.IsResourceCollection() || parsed.IsScopeCollection() { + return &store.ErrInvalid{Message: "invalid argument. 'obj.ID' must refer to a named resource, not a collection"} + } + + converted, err := storeutil.ConvertScopeIDToResourceID(parsed) + if err != nil { + return err + } + + config := store.NewSaveConfig(options...) + + // Compute ETag for the current state of the object. + raw, err := json.Marshal(obj.Data) + if err != nil { + return err + } + + obj.ETag = etag.New(raw) + + // We need different SQL for the case where an etag is provided vs not provided. + // + // The key behavior difference is that if an etag is provided, we should not perform inserts, only updates. + + // This is the more complex query that handles "upserts". It does not process etags. + sql := ` +WITH updated AS ( + INSERT INTO resources (id, original_id, resource_type, root_scope, routing_scope, etag, resource_data) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (id) + DO UPDATE SET resource_data = $7 + RETURNING id +) +SELECT +CASE + WHEN EXISTS (SELECT 1 FROM updated) THEN 'Success' + WHEN EXISTS (SELECT 1 FROM resources WHERE id = $1) THEN 'ErrConcurrency' + ELSE 'ErrNotFound' +END AS result;` + + args := []any{ + storeutil.NormalizePart(converted.String()), + obj.ID, // MUST NOT BE NORMALIZED. Preserve the original casing and format. + storeutil.NormalizePart(converted.Type()), + storeutil.NormalizePart(converted.RootScope()), + storeutil.NormalizePart(converted.RoutingScope()), + obj.ETag, + obj.Data, + } + + if config.ETag != "" { + // This is the simpler query that only performs updates. It requires an etag. + // NOTE: we want to report ErrConcurrency for all failure cases here. This is what the tests do. + sql = ` +WITH updated AS ( + UPDATE resources SET resource_data = $2 + WHERE id = $1 AND etag = $3 + RETURNING id +) +SELECT +CASE + WHEN EXISTS (SELECT 1 FROM updated) THEN 'Success' + WHEN EXISTS (SELECT 1 FROM resources WHERE id = $1) THEN 'ErrConcurrency' + ELSE 'ErrConcurrency' +END AS result;` + + args = []any{storeutil.NormalizePart(converted.String()), obj.Data, config.ETag} + } + + result := "" + err = p.api.QueryRow(ctx, sql, args...).Scan(&result) + if err != nil { + return err + } else if result == "ErrNotFound" { + return &store.ErrNotFound{ID: obj.ID} + } else if result == "ErrConcurrency" { + return &store.ErrConcurrency{} + } + + return nil +} + +// createPaginationToken converts a timestamp to a base64 encoded string. +// +// We use ISO8601/RFC3339 format which postgres understands and can be used for comparison. +// We also add microseconds to the timestamp to ensure uniqueness. +func (p *PostgresClient) createPaginationToken(timestamp time.Time) (string, error) { + return base64.StdEncoding.EncodeToString([]byte(timestamp.UTC().Format(time.RFC3339Nano))), nil +} + +// parsePaginationToken converts a base64 encoded string to a timestamp. +func (p *PostgresClient) parsePaginationToken(token string) (string, error) { + data, err := base64.StdEncoding.DecodeString(token) + if err != nil { + return "", err + } + + // Roundtripping to ensure that we understand the data. + parsed, err := time.Parse(time.RFC3339Nano, string(data)) + if err != nil { + return "", err + } + + return parsed.UTC().Format(time.RFC3339Nano), nil +} diff --git a/pkg/ucp/store/postgres/postgresclient_test.go b/pkg/ucp/store/postgres/postgresclient_test.go new file mode 100644 index 0000000000..45b749f27e --- /dev/null +++ b/pkg/ucp/store/postgres/postgresclient_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2023 The Radius Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package postgres + +import ( + "context" + "os" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + + "github.com/davecgh/go-spew/spew" + "github.com/radius-project/radius/test/testcontext" + shared "github.com/radius-project/radius/test/ucp/storetest" +) + +func Test_PostgresClient(t *testing.T) { + ctx, cancel := testcontext.NewWithCancel(t) + t.Cleanup(cancel) + + // You can get the right value for this by running the command: make db-init + url := os.Getenv("TEST_POSTGRES_URL") + if url == "" { + t.Skip("TEST_POSTGRES_URL is not set.") + return + } + + pool, err := pgxpool.New(ctx, url) + require.NoError(t, err) + + logger := postgresLogger{t: t, pool: pool} + client := NewPostgresClient(&logger) + + clear := func(t *testing.T) { + tag, err := pool.Exec(ctx, "DELETE FROM resources") + require.NoError(t, err) + t.Logf("Database reset ... %d rows deleted", tag.RowsAffected()) + } + + // The actual test logic lives in a shared package, we're just doing the setup here. + shared.RunTest(t, client, clear) +} + +var _ PostgresAPI = (*postgresLogger)(nil) + +type postgresLogger struct { + t *testing.T + pool *pgxpool.Pool +} + +// Exec implements PostgresAPI. +func (l *postgresLogger) Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error) { + l.t.Logf("Executing: %s", sql) + l.t.Logf("Args:\n%s", spew.Sdump(args...)) + return l.pool.Exec(ctx, sql, args...) +} + +// Query implements PostgresAPI. +func (l *postgresLogger) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) { + l.t.Logf("Executing: %s", sql) + l.t.Logf("Args:\n%s", spew.Sdump(args...)) + return l.pool.Query(ctx, sql, args...) +} + +// QueryRow implements PostgresAPI. +func (l *postgresLogger) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row { + l.t.Logf("Executing: %s", sql) + l.t.Logf("Args:\n%s", spew.Sdump(args...)) + return l.pool.QueryRow(ctx, sql, args...) +} diff --git a/pkg/ucp/store/storeutil/id.go b/pkg/ucp/store/storeutil/id.go index d981261480..4b72a54624 100644 --- a/pkg/ucp/store/storeutil/id.go +++ b/pkg/ucp/store/storeutil/id.go @@ -99,14 +99,14 @@ func NormalizePart(part string) string { return strings.ToLower(part) } -// NormalizeResourceID normalizes the resource ID to be consistent between scopes and resources. +// ConvertScopeIDToResourceID normalizes the resource ID to be consistent between scopes and resources. // // For a resource id that identifies a resource, it is already in normalized form. // // - eg: "/planes/radius/local/resourceGroups/my-rg/providers/Applications.Core/applications/my-app" is already // normalized. // - eg: "/planes/radius/local/resourceGroups/my-rg" needs normalization. -func NormalizeResourceID(parsed resources.ID) (resources.ID, error) { +func ConvertScopeIDToResourceID(parsed resources.ID) (resources.ID, error) { // This function normalizes the resource ID to be consistent between scopes and resources. // // For a resource id that identifies a resource, it is already in normalized form. @@ -164,14 +164,14 @@ func NormalizeResourceID(parsed resources.ID) (resources.ID, error) { return resources.ID{}, fmt.Errorf("invalid resource id: %s", parsed.String()) } -// NormalizeResourceType normalizes the resource type to be consistent between scopes and resources. -// See comments on NormalizeResourceID for full context. +// ConvertScopeTypeToResourceType normalizes the resource type to be consistent between scopes and resources. +// See comments on ConvertScopeIDToResourceID for full context. // // For a resource type that identifies a resource, it is already in normalized form. // // - eg: "Applications.Core/applications" is already normalized. // - eg: "resourceGroups" needs normalization. -func NormalizeResourceType(resourceType string) (string, error) { +func ConvertScopeTypeToResourceType(resourceType string) (string, error) { if strings.Contains(resourceType, "/") { // Already normalized. return resourceType, nil diff --git a/pkg/ucp/store/storeutil/id_test.go b/pkg/ucp/store/storeutil/id_test.go index 7be9827506..7fe40de4cb 100644 --- a/pkg/ucp/store/storeutil/id_test.go +++ b/pkg/ucp/store/storeutil/id_test.go @@ -295,7 +295,7 @@ func Test_NormalizePart(t *testing.T) { } } -func Test_NormalizeResourceID(t *testing.T) { +func Test_ConvertScopeIDToResourceID(t *testing.T) { type testcase struct { Input string Expected string @@ -344,7 +344,7 @@ func Test_NormalizeResourceID(t *testing.T) { id, err := resources.Parse(tc.Input) require.NoError(t, err) - result, err := NormalizeResourceID(id) + result, err := ConvertScopeIDToResourceID(id) if tc.IsError { require.Error(t, err) } else { @@ -355,7 +355,7 @@ func Test_NormalizeResourceID(t *testing.T) { } } -func Test_NormalizeResourceType(t *testing.T) { +func Test_ConvertScopeTypeToResourceType(t *testing.T) { type testcase struct { Input string Expected string @@ -396,7 +396,7 @@ func Test_NormalizeResourceType(t *testing.T) { for _, tc := range cases { t.Run(tc.Input, func(t *testing.T) { - result, err := NormalizeResourceType(tc.Input) + result, err := ConvertScopeTypeToResourceType(tc.Input) if tc.IsError { require.Error(t, err) } else {