From 3881a25540a29d7ac0c42535177c12ad9ebcc986 Mon Sep 17 00:00:00 2001 From: Vladyslav Budichenko Date: Fri, 13 Sep 2024 13:45:13 -0400 Subject: [PATCH] feat: add interface to support different types of databases --- DEVELOPMENT.md | 4 +- architecture/ARCHITECTURE.md | 2 +- architecture/MIGRATIONS.MD | 2 +- clients/database/database_test.go | 14 - clients/database/empty/database.go | 40 +++ clients/database/interface.go | 31 +- .../{postgres.go => postgres/database.go} | 40 +-- clients/database/postgres/database_test.go | 18 ++ .../{database.go => postgres/migrate.go} | 10 +- clients/database/postgres/migrate_test.go | 15 + ...3_add_proxied_request_metrics_table.up.sql | 0 ...roxied_request_metrics_method_index.up.sql | 0 ..._request_metrics_block_number_index.up.sql | 0 ...est_metrics_ip_and_hostname_columns.up.sql | 0 ...xied_request_metrics_hostname_index.up.sql | 0 ...rics_user_agent_and_referer_columns.up.sql | 0 ...oxied_request_metrics_origin_column.up.sql | 0 ..._request_metrics_request_time_index.up.sql | 0 ...dd_proxied_request_metrics_id_index.up.sql | 0 ...ition_proxied_request_metrics_table.up.sql | 0 ...20231013122742_add_response_backend.up.sql | 0 ...ied_request_metrics_cachehit_column.up.sql | 0 ..._request_metrics_partofbatch_column.up.sql | 0 .../{ => postgres}/migrations/main.go | 0 clients/database/postgres/partition_test.go | 55 ++++ clients/database/postgres/patrition.go | 260 +++++++++++++++++ .../database/{ => postgres}/request_metric.go | 81 ++---- .../database/postgres/request_metric_test.go | 44 +++ clients/database/postgres/types.go | 67 +++++ clients/database/postgres_test.go | 25 -- clients/database/request_metric_test.go | 37 --- main_batch_test.go | 4 +- main_test.go | 35 +-- routines/metric_compaction.go | 14 +- routines/metric_partitioning.go | 267 +----------------- routines/metric_partitioning_test.go | 57 +--- routines/metric_pruning.go | 10 +- service/handlers.go | 9 +- service/middleware.go | 6 +- service/service.go | 36 ++- 40 files changed, 645 insertions(+), 538 deletions(-) delete mode 100644 clients/database/database_test.go create mode 100644 clients/database/empty/database.go rename clients/database/{postgres.go => postgres/database.go} (79%) create mode 100644 clients/database/postgres/database_test.go rename clients/database/{database.go => postgres/migrate.go} (90%) create mode 100644 clients/database/postgres/migrate_test.go rename clients/database/{ => postgres}/migrations/20230306182203_add_proxied_request_metrics_table.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql (100%) rename clients/database/{ => postgres}/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql (100%) rename clients/database/{ => postgres}/migrations/20231013122742_add_response_backend.up.sql (100%) rename clients/database/{ => postgres}/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql (100%) rename clients/database/{ => postgres}/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql (100%) rename clients/database/{ => postgres}/migrations/main.go (100%) create mode 100644 clients/database/postgres/partition_test.go create mode 100644 clients/database/postgres/patrition.go rename clients/database/{ => postgres}/request_metric.go (59%) create mode 100644 clients/database/postgres/request_metric_test.go create mode 100644 clients/database/postgres/types.go delete mode 100644 clients/database/postgres_test.go delete mode 100644 clients/database/request_metric_test.go diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 06e54e9..cb470d1 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -131,7 +131,7 @@ make it p=".*Eth_getBlockByNumberRequest" ## Migrations -On startup the proxy service will run any SQL based migration in the [migrations folder](./clients/database/migrations) that haven't already been run against the database being used. +On startup the proxy service will run any SQL based migration in the [migrations folder](clients/database/postgres/migrations) that haven't already been run against the database being used. For lower level details on how the migration process works consult [these docs](https://bun.uptrace.dev/guide/migrations.html). @@ -144,7 +144,7 @@ $ date '+%Y%m%d%H%M%S' > 20230306182227 ``` -Add new SQL file with commands to run in the new migration (add/delete/modify tables and or indices) in the in the [migrations folder](./clients/database/migrations) +Add new SQL file with commands to run in the new migration (add/delete/modify tables and or indices) in the in the [migrations folder](clients/database/postgres/migrations) ### Running migrations diff --git a/architecture/ARCHITECTURE.md b/architecture/ARCHITECTURE.md index dfe953b..35f8e0e 100644 --- a/architecture/ARCHITECTURE.md +++ b/architecture/ARCHITECTURE.md @@ -20,7 +20,7 @@ The proxy functionality provides the foundation for all other proxy service feat ![API Observability Worfklow Conceptual Overview](./images/observability_workflow_conceptual.jpg) -For every request that is proxied by the proxy service, a [request metric](../decode/evm_rpc.go) is created and stored in a [postgres table](../clients/database/migrations/20230306182203_add_proxied_request_metrics_table.up.sql) that can be aggregated with other request metrics over a time range to answer ad hoc questions such as: +For every request that is proxied by the proxy service, a [request metric](../decode/evm_rpc.go) is created and stored in a [postgres table](../clients/database/postgres/migrations/20230306182203_add_proxied_request_metrics_table.up.sql) that can be aggregated with other request metrics over a time range to answer ad hoc questions such as: - what methods take the longest time? - what methods are called the most frequently? diff --git a/architecture/MIGRATIONS.MD b/architecture/MIGRATIONS.MD index 419b609..8fb2716 100644 --- a/architecture/MIGRATIONS.MD +++ b/architecture/MIGRATIONS.MD @@ -14,7 +14,7 @@ Setting an environment variable named `RUN_DATABASE_MIGRATIONS` to true will cau ### Migration Format -New migration files must be placed in the [migrations directory](../clients/database/migrations/), have a unique name, and start with a timestamp in the below format: +New migration files must be placed in the [migrations directory](../clients/database/postgres/migrations/), have a unique name, and start with a timestamp in the below format: ```bash $ date '+%Y%m%d%H%M%S' diff --git a/clients/database/database_test.go b/clients/database/database_test.go deleted file mode 100644 index 20e779d..0000000 --- a/clients/database/database_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package database - -import ( - "context" - "github.com/stretchr/testify/require" - "github.com/uptrace/bun/migrate" - "testing" -) - -func TestMigrateNoDatabase(t *testing.T) { - migrations, err := Migrate(context.Background(), nil, migrate.Migrations{}, nil) - require.NoError(t, err) - require.Empty(t, migrations) -} diff --git a/clients/database/empty/database.go b/clients/database/empty/database.go new file mode 100644 index 0000000..ce844dd --- /dev/null +++ b/clients/database/empty/database.go @@ -0,0 +1,40 @@ +package empty + +import ( + "context" + "github.com/kava-labs/kava-proxy-service/clients/database" +) + +type Empty struct{} + +func New() *Empty { + return &Empty{} +} + +func (e *Empty) SaveProxiedRequestMetric(ctx context.Context, metric *database.ProxiedRequestMetric) error { + return nil +} + +func (e *Empty) ListProxiedRequestMetricsWithPagination(ctx context.Context, cursor int64, limit int) ([]*database.ProxiedRequestMetric, int64, error) { + return []*database.ProxiedRequestMetric{}, 0, nil +} + +func (e *Empty) CountAttachedProxiedRequestMetricPartitions(ctx context.Context) (int64, error) { + return 0, nil +} + +func (e *Empty) GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context) (string, error) { + return "", nil +} + +func (e *Empty) DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, n int64) error { + return nil +} + +func (e *Empty) HealthCheck() error { + return nil +} + +func (e *Empty) Partition(prefillPeriodDays int) error { + return nil +} diff --git a/clients/database/interface.go b/clients/database/interface.go index 2d5587e..2a8c0b5 100644 --- a/clients/database/interface.go +++ b/clients/database/interface.go @@ -1,11 +1,34 @@ package database -import "context" +import ( + "context" + "time" +) type MetricsDatabase interface { - SaveProxiedRequestMetric(ctx context.Context, prm *ProxiedRequestMetric) error - ListProxiedRequestMetricsWithPagination(ctx context.Context, cursor int64, limit int) ([]ProxiedRequestMetric, int64, error) + SaveProxiedRequestMetric(ctx context.Context, metric *ProxiedRequestMetric) error + ListProxiedRequestMetricsWithPagination(ctx context.Context, cursor int64, limit int) ([]*ProxiedRequestMetric, int64, error) CountAttachedProxiedRequestMetricPartitions(ctx context.Context) (int64, error) GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context) (string, error) - DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, days int) error + DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, n int64) error + + HealthCheck() error + Partition(prefillPeriodDays int) error +} + +type ProxiedRequestMetric struct { + ID int64 + MethodName string + BlockNumber *int64 + ResponseLatencyMilliseconds int64 + Hostname string + RequestIP string + RequestTime time.Time + UserAgent *string + Referer *string + Origin *string + ResponseBackend string + ResponseBackendRoute string + CacheHit bool + PartOfBatch bool } diff --git a/clients/database/postgres.go b/clients/database/postgres/database.go similarity index 79% rename from clients/database/postgres.go rename to clients/database/postgres/database.go index 02b0195..06bd279 100644 --- a/clients/database/postgres.go +++ b/clients/database/postgres/database.go @@ -1,4 +1,4 @@ -package database +package postgres import ( "crypto/tls" @@ -13,12 +13,9 @@ import ( "github.com/uptrace/bun/extra/bundebug" ) -// PostgresDatabaseConfig contains values for creating a +// DatabaseConfig contains values for creating a // new connection to a postgres database -type PostgresDatabaseConfig struct { - // DatabaseDisabled is used to disable the database, and it won't be used at all. All operations will be skipped. - DatabaseDisabled bool - +type DatabaseConfig struct { DatabaseName string DatabaseEndpointURL string DatabaseUsername string @@ -34,21 +31,15 @@ type PostgresDatabaseConfig struct { Logger *logging.ServiceLogger } -// PostgresClient wraps a connection to a postgres database -type PostgresClient struct { - isDisabled bool - *bun.DB +// Client wraps a connection to a postgres database +type Client struct { + db *bun.DB + logger *logging.ServiceLogger } -// NewPostgresClient returns a new connection to the specified +// NewClient returns a new connection to the specified // postgres data and error (if any) -func NewPostgresClient(config PostgresDatabaseConfig) (PostgresClient, error) { - if config.DatabaseDisabled { - return PostgresClient{ - isDisabled: true, - }, nil - } - +func NewClient(config DatabaseConfig) (Client, error) { // configure postgres database connection options var pgOptions *pgdriver.Connector @@ -96,17 +87,14 @@ func NewPostgresClient(config PostgresDatabaseConfig) (PostgresClient, error) { db.AddQueryHook(bundebug.NewQueryHook(bundebug.WithVerbose(true))) } - return PostgresClient{ - DB: db, + return Client{ + db: db, + logger: config.Logger, }, nil } // HealthCheck returns an error if the database can not // be connected to and queried, nil otherwise -func (pg *PostgresClient) HealthCheck() error { - if pg.isDisabled { - return nil - } - - return pg.Ping() +func (c *Client) HealthCheck() error { + return c.db.Ping() } diff --git a/clients/database/postgres/database_test.go b/clients/database/postgres/database_test.go new file mode 100644 index 0000000..d1cd9af --- /dev/null +++ b/clients/database/postgres/database_test.go @@ -0,0 +1,18 @@ +package postgres + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestDisabledDBCreation(t *testing.T) { + config := DatabaseConfig{} + _, err := NewClient(config) + require.Error(t, err) +} + +func TestHealthcheckNoDatabase(t *testing.T) { + config := DatabaseConfig{} + _, err := NewClient(config) + require.Error(t, err) +} diff --git a/clients/database/database.go b/clients/database/postgres/migrate.go similarity index 90% rename from clients/database/database.go rename to clients/database/postgres/migrate.go index bdff4e0..32c30cf 100644 --- a/clients/database/database.go +++ b/clients/database/postgres/migrate.go @@ -1,4 +1,4 @@ -package database +package postgres import ( "context" @@ -6,7 +6,6 @@ import ( "time" "github.com/kava-labs/kava-proxy-service/logging" - "github.com/uptrace/bun" "github.com/uptrace/bun/migrate" ) @@ -15,12 +14,9 @@ import ( // returning error (if any) and a list of migrations that have been // run and any that were not // If db is nil, returns empty slice and nil error, as there is no database to migrate. -func Migrate(ctx context.Context, db *bun.DB, migrations migrate.Migrations, logger *logging.ServiceLogger) (*migrate.MigrationSlice, error) { - if db == nil { - return &migrate.MigrationSlice{}, nil - } +func (c *Client) Migrate(ctx context.Context, migrations migrate.Migrations, logger *logging.ServiceLogger) (*migrate.MigrationSlice, error) { // set up migration config - migrator := migrate.NewMigrator(db, &migrations) + migrator := migrate.NewMigrator(c.db, &migrations) // create / verify tables used to tack migrations err := migrator.Init(ctx) diff --git a/clients/database/postgres/migrate_test.go b/clients/database/postgres/migrate_test.go new file mode 100644 index 0000000..bd0f9cb --- /dev/null +++ b/clients/database/postgres/migrate_test.go @@ -0,0 +1,15 @@ +package postgres + +import ( + "context" + "github.com/stretchr/testify/require" + "github.com/uptrace/bun/migrate" + "testing" +) + +func TestMigrateNoDatabase(t *testing.T) { + db := &Client{} + + _, err := db.Migrate(context.Background(), migrate.Migrations{}, nil) + require.Error(t, err) +} diff --git a/clients/database/migrations/20230306182203_add_proxied_request_metrics_table.up.sql b/clients/database/postgres/migrations/20230306182203_add_proxied_request_metrics_table.up.sql similarity index 100% rename from clients/database/migrations/20230306182203_add_proxied_request_metrics_table.up.sql rename to clients/database/postgres/migrations/20230306182203_add_proxied_request_metrics_table.up.sql diff --git a/clients/database/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql b/clients/database/postgres/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql similarity index 100% rename from clients/database/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql rename to clients/database/postgres/migrations/20230310154711_add_proxied_request_metrics_method_index.up.sql diff --git a/clients/database/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql b/clients/database/postgres/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql similarity index 100% rename from clients/database/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql rename to clients/database/postgres/migrations/20230310154712_add_proxied_request_metrics_block_number_index.up.sql diff --git a/clients/database/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql b/clients/database/postgres/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql similarity index 100% rename from clients/database/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql rename to clients/database/postgres/migrations/20230320161721_add_proxied_request_metrics_ip_and_hostname_columns.up.sql diff --git a/clients/database/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql b/clients/database/postgres/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql similarity index 100% rename from clients/database/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql rename to clients/database/postgres/migrations/20230320161722_add_proxied_request_metrics_hostname_index.up.sql diff --git a/clients/database/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql b/clients/database/postgres/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql similarity index 100% rename from clients/database/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql rename to clients/database/postgres/migrations/20230410142045_add_proxied_request_metrics_user_agent_and_referer_columns.up.sql diff --git a/clients/database/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql b/clients/database/postgres/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql similarity index 100% rename from clients/database/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql rename to clients/database/postgres/migrations/20230410153743_add_proxied_request_metrics_origin_column.up.sql diff --git a/clients/database/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql b/clients/database/postgres/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql similarity index 100% rename from clients/database/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql rename to clients/database/postgres/migrations/20230510135051_add_proxied_request_metrics_request_time_index.up.sql diff --git a/clients/database/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql b/clients/database/postgres/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql similarity index 100% rename from clients/database/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql rename to clients/database/postgres/migrations/20230512150351_add_proxied_request_metrics_id_index.up.sql diff --git a/clients/database/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql b/clients/database/postgres/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql similarity index 100% rename from clients/database/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql rename to clients/database/postgres/migrations/20230523101344_partition_proxied_request_metrics_table.up.sql diff --git a/clients/database/migrations/20231013122742_add_response_backend.up.sql b/clients/database/postgres/migrations/20231013122742_add_response_backend.up.sql similarity index 100% rename from clients/database/migrations/20231013122742_add_response_backend.up.sql rename to clients/database/postgres/migrations/20231013122742_add_response_backend.up.sql diff --git a/clients/database/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql b/clients/database/postgres/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql similarity index 100% rename from clients/database/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql rename to clients/database/postgres/migrations/20231027165300_add_proxied_request_metrics_cachehit_column.up.sql diff --git a/clients/database/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql b/clients/database/postgres/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql similarity index 100% rename from clients/database/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql rename to clients/database/postgres/migrations/20240415111242_add_proxied_request_metrics_partofbatch_column.up.sql diff --git a/clients/database/migrations/main.go b/clients/database/postgres/migrations/main.go similarity index 100% rename from clients/database/migrations/main.go rename to clients/database/postgres/migrations/main.go diff --git a/clients/database/postgres/partition_test.go b/clients/database/postgres/partition_test.go new file mode 100644 index 0000000..8e8d1f0 --- /dev/null +++ b/clients/database/postgres/partition_test.go @@ -0,0 +1,55 @@ +package postgres + +import ( + "github.com/kava-labs/kava-proxy-service/config" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestUnitTestPartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsNotContainedInCurrentMonth(t *testing.T) { + // prepare + + // pick a date in the middle of a month + startFrom := time.Date(1989, 5, 20, 12, 0, 0, 0, time.UTC) + + // set prefill period to more then days remaining in month + // from above date + daysToPrefill := 21 + + // execute + actualPartitionsForPeriod, err := PartitionsForPeriod(startFrom, daysToPrefill) + + // assert + assert.Nil(t, err) + assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) +} + +func TestUnitTestPartitionsForPeriodReturnsErrWhenTooManyPrefillDays(t *testing.T) { + // prepare + daysToPrefill := config.MaxMetricPartitioningPrefillPeriodDays + 1 + + // execute + _, err := PartitionsForPeriod(time.Now(), daysToPrefill) + + // assert + assert.NotNil(t, err) +} + +func TestUnitTestPartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsContainedInCurrentMonth(t *testing.T) { + // prepare + + // pick a date in the middle of a month + startFrom := time.Date(1989, 5, 11, 12, 0, 0, 0, time.UTC) + + // set prefill period to less then days remaining in month + // from above date + daysToPrefill := 3 + + // execute + actualPartitionsForPeriod, err := PartitionsForPeriod(startFrom, daysToPrefill) + + // assert + assert.Nil(t, err) + assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) +} diff --git a/clients/database/postgres/patrition.go b/clients/database/postgres/patrition.go new file mode 100644 index 0000000..4c1cfad --- /dev/null +++ b/clients/database/postgres/patrition.go @@ -0,0 +1,260 @@ +package postgres + +import ( + "context" + "fmt" + "github.com/kava-labs/kava-proxy-service/config" + "math" + "strings" + "time" +) + +const ( + PartitionBaseTableName = "proxied_request_metrics" +) + +// PartitionPeriod represents a single postgres partitioned +// table from a starting point (inclusive of that point in time) +// to an end point (exclusive of that point in time) +type PartitionPeriod struct { + TableName string + InclusiveStartPeriod time.Time + ExclusiveEndPeriod time.Time +} + +// daysInMonth returns the number of days in a month +func daysInMonth(t time.Time) int { + y, m, _ := t.Date() + return time.Date(y, m+1, 0, 0, 0, 0, 0, time.UTC).Day() +} + +// PartitionsForPeriod attempts to generate the partitions +// to create when prefilling numDaysToPrefill, returning the list of +// of partitions and error (if any) +func PartitionsForPeriod(start time.Time, numDaysToPrefill int) ([]PartitionPeriod, error) { + var partitionPeriods []PartitionPeriod + // check function constraints needed to ensure expected behavior + if numDaysToPrefill > config.MaxMetricPartitioningPrefillPeriodDays { + return partitionPeriods, fmt.Errorf("more than %d prefill days specified %d", config.MaxMetricPartitioningPrefillPeriodDays, numDaysToPrefill) + } + + currentYear, currentMonth, currentDay := start.Date() + + daysInCurrentMonth := daysInMonth(start) + + // add one to include the current day + newDaysRemainingInCurrentMonth := daysInCurrentMonth - currentDay + 1 + + // generate partitions for current month + totalPartitionsToGenerate := numDaysToPrefill + + partitionsToGenerateForCurrentMonth := int(math.Min(float64(newDaysRemainingInCurrentMonth), float64(numDaysToPrefill))) + + // generate partitions for current month + for partitionIndex := 0; partitionsToGenerateForCurrentMonth > 0; partitionIndex++ { + partitionPeriod := PartitionPeriod{ + TableName: fmt.Sprintf("%s_year_%d_month_%d_day_%d", PartitionBaseTableName, currentYear, currentMonth, currentDay+partitionIndex), + InclusiveStartPeriod: start.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), + ExclusiveEndPeriod: start.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), + } + + partitionPeriods = append(partitionPeriods, partitionPeriod) + + partitionsToGenerateForCurrentMonth-- + } + + // check to see if we need to create any partitions for the + // upcoming month + if totalPartitionsToGenerate > newDaysRemainingInCurrentMonth { + futureMonth := start.Add(time.Hour * 24 * time.Duration(newDaysRemainingInCurrentMonth+1)) + + nextYear, nextMonth, nextDay := futureMonth.Date() + + // on function entry we assert that pre-fill days won't + // overflow more than two unique months + // to generate partitions for + partitionsToGenerateForFutureMonth := totalPartitionsToGenerate - newDaysRemainingInCurrentMonth + + // generate partitions for future month + for partitionIndex := 0; partitionsToGenerateForFutureMonth > 0; partitionIndex++ { + partitionPeriod := PartitionPeriod{ + TableName: fmt.Sprintf("%s_year%d_month%d_day%d", PartitionBaseTableName, nextYear, nextMonth, nextDay+partitionIndex), + InclusiveStartPeriod: futureMonth.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), + ExclusiveEndPeriod: futureMonth.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), + } + + partitionPeriods = append(partitionPeriods, partitionPeriod) + + partitionsToGenerateForFutureMonth-- + } + } + + return partitionPeriods, nil +} + +// partition attempts to create (idempotently) future partitions +// for storing proxied request metrics, returning error (if any) +func (c *Client) Partition(prefillPeriodDays int) error { + // calculate partition name and ranges to create + partitionsToCreate, err := PartitionsForPeriod(time.Now(), prefillPeriodDays) + + if err != nil { + return err + } + + c.logger.Trace().Msg(fmt.Sprintf("partitionsToCreate %+v", partitionsToCreate)) + + // create partition for each of those days + for _, partitionToCreate := range partitionsToCreate { + // do below in a transaction to allow retries + // each run of the routine to smooth any over transient issues + // such as dropped database connection or rolling service updates + // and support safe concurrency of multiple instances of the service + // attempting to create partitions + // https://go.dev/doc/database/execute-transactions + tx, err := c.db.BeginTx(context.Background(), nil) + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s beginning transaction for partition %+v", err, partitionToCreate)) + + continue + } + + // check to see if partition already exists + _, err = tx.Exec(fmt.Sprintf("select * from %s limit 1;", partitionToCreate.TableName)) + + if err != nil { + if !strings.Contains(err.Error(), "42P01") { + c.logger.Error().Msg(fmt.Sprintf("error %s querying for partition %+v", err, partitionToCreate)) + + tx.Rollback() + + continue + } + + // else error indicates table doesn't exist so safe for us to create it + createTableStatement := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s + (LIKE proxied_request_metrics INCLUDING DEFAULTS INCLUDING CONSTRAINTS); + `, partitionToCreate.TableName) + _, err = c.db.Exec(createTableStatement) + + if err != nil { + c.logger.Debug().Msg(fmt.Sprintf("error %s creating partition %+v using statement %s", err, partitionToCreate, createTableStatement)) + + err = tx.Rollback() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, createTableStatement)) + } + + continue + } + + // attach partitions to main table + attachPartitionStatement := fmt.Sprintf(` + ALTER TABLE proxied_request_metrics ATTACH PARTITION %s + FOR VALUES FROM ('%s') TO ('%s'); + `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) + _, err = c.db.Exec(attachPartitionStatement) + + if err != nil { + c.logger.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, + partitionToCreate, attachPartitionStatement)) + + err = tx.Rollback() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) + } + + continue + } + + err = tx.Commit() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) + + continue + } + + c.logger.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) + + continue + } else { + // table exists, no need to create it + c.logger.Trace().Msg(fmt.Sprintf("not creating table for partition %+v as it already exists", partitionToCreate)) + + // but check if it is attached + partitionIsAttachedQuery := fmt.Sprintf(` + SELECT + nmsp_parent.nspname AS parent_schema, + parent.relname AS parent, + nmsp_child.nspname AS child_schema, + child.relname AS child + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace + JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace + WHERE parent.relname='proxied_request_metrics' and child.relname='%s';`, partitionToCreate.TableName) + result, err := c.db.Query(partitionIsAttachedQuery) + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s querying %s to see if partition %+v is already attached", err, partitionIsAttachedQuery, partitionToCreate)) + + continue + } + + if !result.Next() { + c.logger.Trace().Msg(fmt.Sprintf("attaching created but dangling partition %+v", partitionToCreate)) + // table is not attached, attach it + attachPartitionStatement := fmt.Sprintf(` + ALTER TABLE proxied_request_metrics ATTACH PARTITION %s + FOR VALUES FROM ('%s') TO ('%s'); + `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) + _, err = c.db.Exec(attachPartitionStatement) + + if err != nil { + c.logger.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, + partitionToCreate, attachPartitionStatement)) + + err = tx.Rollback() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) + } + + continue + } + + err = tx.Commit() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) + + continue + } + + c.logger.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) + + continue + } + + result.Close() + + c.logger.Trace().Msg(fmt.Sprintf("not attaching partition %+v as it is already attached", partitionToCreate)) + + err = tx.Commit() + + if err != nil { + c.logger.Error().Msg(fmt.Sprintf("error %s committing empty transaction for already created partition %+v", err, partitionToCreate)) + } + + continue + } + } + + return nil +} diff --git a/clients/database/request_metric.go b/clients/database/postgres/request_metric.go similarity index 59% rename from clients/database/request_metric.go rename to clients/database/postgres/request_metric.go index 9b98c12..8ab72d1 100644 --- a/clients/database/request_metric.go +++ b/clients/database/postgres/request_metric.go @@ -1,47 +1,22 @@ -package database +package postgres import ( "context" + "database/sql" "fmt" - "time" - - "github.com/uptrace/bun" + "github.com/kava-labs/kava-proxy-service/clients/database" ) const ( ProxiedRequestMetricsTableName = "proxied_request_metrics" ) -// ProxiedRequestMetric contains request metrics for -// a single request proxied by the proxy service -type ProxiedRequestMetric struct { - bun.BaseModel `bun:"table:proxied_request_metrics,alias:prm"` - - ID int64 `bun:",pk,autoincrement"` - MethodName string - BlockNumber *int64 - ResponseLatencyMilliseconds int64 - Hostname string - RequestIP string `bun:"request_ip"` - RequestTime time.Time - UserAgent *string - Referer *string - Origin *string - ResponseBackend string - ResponseBackendRoute string - CacheHit bool - PartOfBatch bool -} - // Save saves the current ProxiedRequestMetric to // the database, returning error (if any). // If db is nil, returns nil error. -func (prm *ProxiedRequestMetric) Save(ctx context.Context, db *bun.DB) error { - if db == nil { - return nil - } - - _, err := db.NewInsert().Model(prm).Exec(ctx) +func (c *Client) SaveProxiedRequestMetric(ctx context.Context, metric *database.ProxiedRequestMetric) error { + prm := convertProxiedRequestMetric(metric) + _, err := c.db.NewInsert().Model(prm).Exec(ctx) return err } @@ -51,34 +26,31 @@ func (prm *ProxiedRequestMetric) Save(ctx context.Context, db *bun.DB) error { // error (if any) along with a cursor to use to fetch the next page // if the cursor is 0 no more pages exists. // Uses only in tests. If db is nil, returns empty slice and 0 cursor. -func ListProxiedRequestMetricsWithPagination(ctx context.Context, db *bun.DB, cursor int64, limit int) ([]ProxiedRequestMetric, int64, error) { - if db == nil { - return []ProxiedRequestMetric{}, 0, nil - } - +func (c *Client) ListProxiedRequestMetricsWithPagination(ctx context.Context, cursor int64, limit int) ([]*database.ProxiedRequestMetric, int64, error) { var proxiedRequestMetrics []ProxiedRequestMetric var nextCursor int64 - count, err := db.NewSelect().Model(&proxiedRequestMetrics).Where("ID > ?", cursor).Limit(limit).ScanAndCount(ctx) + count, err := c.db.NewSelect().Model(&proxiedRequestMetrics).Where("ID > ?", cursor).Limit(limit).ScanAndCount(ctx) // look up the id of the last if count == limit { nextCursor = proxiedRequestMetrics[count-1].ID } + metrics := make([]*database.ProxiedRequestMetric, 0, len(proxiedRequestMetrics)) + for _, metric := range proxiedRequestMetrics { + metrics = append(metrics, metric.ToProxiedRequestMetric()) + } + // otherwise leave nextCursor as 0 to signal no more rows - return proxiedRequestMetrics, nextCursor, err + return metrics, nextCursor, err } // CountAttachedProxiedRequestMetricPartitions returns the current // count of attached partitions for the ProxiedRequestMetricsTableName // and error (if any). // If db is nil, returns 0 and nil error. -func CountAttachedProxiedRequestMetricPartitions(ctx context.Context, db *bun.DB) (int64, error) { - if db == nil { - return 0, nil - } - +func (c *Client) CountAttachedProxiedRequestMetricPartitions(ctx context.Context) (int64, error) { var count int64 countPartitionsQuery := fmt.Sprintf(` @@ -90,7 +62,7 @@ func CountAttachedProxiedRequestMetricPartitions(ctx context.Context, db *bun.DB JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace WHERE parent.relname='%s';`, ProxiedRequestMetricsTableName) - row := db.QueryRow(countPartitionsQuery) + row := c.db.QueryRow(countPartitionsQuery) err := row.Scan(&count) if err != nil { @@ -104,11 +76,7 @@ func CountAttachedProxiedRequestMetricPartitions(ctx context.Context, db *bun.DB // GetLastCreatedAttachedProxiedRequestMetricsPartitionName gets the table name // for the last created (and attached) proxied request metrics partition // Used for status check. If db is nil, returns empty string and nil error. -func GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context, db *bun.DB) (string, error) { - if db == nil { - return "", nil - } - +func (c *Client) GetLastCreatedAttachedProxiedRequestMetricsPartitionName(ctx context.Context) (string, error) { var lastCreatedAttachedPartitionName string lastCreatedAttachedPartitionNameQuery := fmt.Sprintf(` @@ -121,7 +89,7 @@ FROM pg_inherits JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace WHERE parent.relname='%s' order by child.oid desc limit 1;`, ProxiedRequestMetricsTableName) - row := db.QueryRow(lastCreatedAttachedPartitionNameQuery) + row := c.db.QueryRow(lastCreatedAttachedPartitionNameQuery) err := row.Scan(&lastCreatedAttachedPartitionName) if err != nil { @@ -136,12 +104,13 @@ WHERE parent.relname='%s' order by child.oid desc limit 1;`, ProxiedRequestMetri // all proxied request metrics older than the specified // days, returning error (if any). // Used during pruning process. If db is nil, returns nil error. -func DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, db *bun.DB, n int64) error { - if db == nil { - return nil - } - - _, err := db.NewDelete().Model((*ProxiedRequestMetric)(nil)).Where(fmt.Sprintf("request_time < now() - interval '%d' day", n)).Exec(ctx) +func (c *Client) DeleteProxiedRequestMetricsOlderThanNDays(ctx context.Context, n int64) error { + _, err := c.db.NewDelete().Model((*ProxiedRequestMetric)(nil)).Where(fmt.Sprintf("request_time < now() - interval '%d' day", n)).Exec(ctx) return err } + +// Exec is not part of database.MetricsDatabase interface, so it is used only in the implementation for test purposes. +func (c *Client) Exec(query string, args ...interface{}) (sql.Result, error) { + return c.db.Exec(query, args...) +} diff --git a/clients/database/postgres/request_metric_test.go b/clients/database/postgres/request_metric_test.go new file mode 100644 index 0000000..62507e6 --- /dev/null +++ b/clients/database/postgres/request_metric_test.go @@ -0,0 +1,44 @@ +package postgres + +import ( + "context" + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/stretchr/testify/require" + "testing" +) + +func TestNoDatabaseSave(t *testing.T) { + db := &Client{} + + prm := &database.ProxiedRequestMetric{} + err := db.SaveProxiedRequestMetric(context.Background(), prm) + require.Error(t, err) +} + +func TestNoDatabaseListProxiedRequestMetricsWithPagination(t *testing.T) { + db := &Client{} + + _, _, err := db.ListProxiedRequestMetricsWithPagination(context.Background(), 0, 0) + require.Error(t, err) +} + +func TestNoDatabaseCountAttachedProxiedRequestMetricPartitions(t *testing.T) { + db := &Client{} + + _, err := db.CountAttachedProxiedRequestMetricPartitions(context.Background()) + require.Error(t, err) +} + +func TestGetLastCreatedAttachedProxiedRequestMetricsPartitionName(t *testing.T) { + db := &Client{} + + _, err := db.GetLastCreatedAttachedProxiedRequestMetricsPartitionName(context.Background()) + require.Error(t, err) +} + +func TestDeleteProxiedRequestMetricsOlderThanNDays(t *testing.T) { + db := &Client{} + + err := db.DeleteProxiedRequestMetricsOlderThanNDays(context.Background(), 0) + require.Error(t, err) +} diff --git a/clients/database/postgres/types.go b/clients/database/postgres/types.go new file mode 100644 index 0000000..4e17c5d --- /dev/null +++ b/clients/database/postgres/types.go @@ -0,0 +1,67 @@ +package postgres + +import ( + "time" + + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/uptrace/bun" +) + +// ProxiedRequestMetric contains request metrics for +// a single request proxied by the proxy service +type ProxiedRequestMetric struct { + bun.BaseModel `bun:"table:proxied_request_metrics,alias:prm"` + + ID int64 `bun:",pk,autoincrement"` + MethodName string + BlockNumber *int64 + ResponseLatencyMilliseconds int64 + Hostname string + RequestIP string `bun:"request_ip"` + RequestTime time.Time + UserAgent *string + Referer *string + Origin *string + ResponseBackend string + ResponseBackendRoute string + CacheHit bool + PartOfBatch bool +} + +func (prm *ProxiedRequestMetric) ToProxiedRequestMetric() *database.ProxiedRequestMetric { + return &database.ProxiedRequestMetric{ + ID: prm.ID, + MethodName: prm.MethodName, + BlockNumber: prm.BlockNumber, + ResponseLatencyMilliseconds: prm.ResponseLatencyMilliseconds, + Hostname: prm.Hostname, + RequestIP: prm.RequestIP, + RequestTime: prm.RequestTime, + UserAgent: prm.UserAgent, + Referer: prm.Referer, + Origin: prm.Origin, + ResponseBackend: prm.ResponseBackend, + ResponseBackendRoute: prm.ResponseBackendRoute, + CacheHit: prm.CacheHit, + PartOfBatch: prm.PartOfBatch, + } +} + +func convertProxiedRequestMetric(metric *database.ProxiedRequestMetric) *ProxiedRequestMetric { + return &ProxiedRequestMetric{ + ID: metric.ID, + MethodName: metric.MethodName, + BlockNumber: metric.BlockNumber, + ResponseLatencyMilliseconds: metric.ResponseLatencyMilliseconds, + Hostname: metric.Hostname, + RequestIP: metric.RequestIP, + RequestTime: metric.RequestTime, + UserAgent: metric.UserAgent, + Referer: metric.Referer, + Origin: metric.Origin, + ResponseBackend: metric.ResponseBackend, + ResponseBackendRoute: metric.ResponseBackendRoute, + CacheHit: metric.CacheHit, + PartOfBatch: metric.PartOfBatch, + } +} diff --git a/clients/database/postgres_test.go b/clients/database/postgres_test.go deleted file mode 100644 index 2cbbb7e..0000000 --- a/clients/database/postgres_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package database - -import ( - "github.com/stretchr/testify/require" - "testing" -) - -func TestDisabledDBCreation(t *testing.T) { - config := PostgresDatabaseConfig{ - DatabaseDisabled: true, - } - db, err := NewPostgresClient(config) - require.NoError(t, err) - require.True(t, db.isDisabled) -} - -func TestHealthcheckNoDatabase(t *testing.T) { - config := PostgresDatabaseConfig{ - DatabaseDisabled: true, - } - db, err := NewPostgresClient(config) - require.NoError(t, err) - err = db.HealthCheck() - require.NoError(t, err) -} diff --git a/clients/database/request_metric_test.go b/clients/database/request_metric_test.go deleted file mode 100644 index 37afe30..0000000 --- a/clients/database/request_metric_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package database - -import ( - "context" - "github.com/stretchr/testify/require" - "testing" -) - -func TestNoDatabaseSave(t *testing.T) { - prm := ProxiedRequestMetric{} - err := prm.Save(context.Background(), nil) - require.NoError(t, err) -} - -func TestNoDatabaseListProxiedRequestMetricsWithPagination(t *testing.T) { - proxiedRequestMetrics, cursor, err := ListProxiedRequestMetricsWithPagination(context.Background(), nil, 0, 0) - require.NoError(t, err) - require.Empty(t, proxiedRequestMetrics) - require.Zero(t, cursor) -} - -func TestNoDatabaseCountAttachedProxiedRequestMetricPartitions(t *testing.T) { - count, err := CountAttachedProxiedRequestMetricPartitions(context.Background(), nil) - require.NoError(t, err) - require.Zero(t, count) -} - -func TestGetLastCreatedAttachedProxiedRequestMetricsPartitionName(t *testing.T) { - partitionName, err := GetLastCreatedAttachedProxiedRequestMetricsPartitionName(context.Background(), nil) - require.NoError(t, err) - require.Empty(t, partitionName) -} - -func TestDeleteProxiedRequestMetricsOlderThanNDays(t *testing.T) { - err := DeleteProxiedRequestMetricsOlderThanNDays(context.Background(), nil, 0) - require.NoError(t, err) -} diff --git a/main_batch_test.go b/main_batch_test.go index 672273d..adb7872 100644 --- a/main_batch_test.go +++ b/main_batch_test.go @@ -4,13 +4,13 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" "io" "net/http" "strconv" "testing" "time" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/service/cachemdw" "github.com/redis/go-redis/v9" @@ -44,7 +44,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { cleanUpRedis(t, redisClient) expectKeysNum(t, redisClient, 0) - db, err := database.NewPostgresClient(databaseConfig) + db, err := postgres.NewClient(databaseConfig) require.NoError(t, err) cleanMetricsDb(t, db) diff --git a/main_test.go b/main_test.go index 5ad4b40..d098290 100644 --- a/main_test.go +++ b/main_test.go @@ -7,6 +7,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" "io" "log" "math/big" @@ -27,7 +29,6 @@ import ( "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" @@ -70,7 +71,7 @@ var ( databasePassword = os.Getenv("DATABASE_PASSWORD") databaseUsername = os.Getenv("DATABASE_USERNAME") databaseName = os.Getenv("DATABASE_NAME") - databaseConfig = database.PostgresDatabaseConfig{ + databaseConfig = postgres.DatabaseConfig{ DatabaseName: databaseName, DatabaseEndpointURL: databaseURL, DatabaseUsername: databaseUsername, @@ -88,22 +89,22 @@ var ( // lookup all the request metrics in the database paging as necessary // search for any request metrics between startTime and time.Now() for particular request methods // if testedmethods is empty, all metrics in timeframe are returned. -func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Time, testedmethods []string) []database.ProxiedRequestMetric { +func findMetricsInWindowForMethods(db postgres.Client, startTime time.Time, testedmethods []string) []*database.ProxiedRequestMetric { extension := time.Duration(testExtendMetricWindowMs) * time.Millisecond // add small buffer into future in case metrics are still being created endTime := time.Now().Add(extension) var nextCursor int64 - var proxiedRequestMetrics []database.ProxiedRequestMetric + var proxiedRequestMetrics []*database.ProxiedRequestMetric - proxiedRequestMetricsPage, nextCursor, err := database.ListProxiedRequestMetricsWithPagination(testContext, db.DB, nextCursor, 10000) + proxiedRequestMetricsPage, nextCursor, err := db.ListProxiedRequestMetricsWithPagination(testContext, nextCursor, 10000) if err != nil { panic(err) } proxiedRequestMetrics = proxiedRequestMetricsPage for nextCursor != 0 { - proxiedRequestMetricsPage, nextCursor, err = database.ListProxiedRequestMetricsWithPagination(testContext, db.DB, nextCursor, 10000) + proxiedRequestMetricsPage, nextCursor, err = db.ListProxiedRequestMetricsWithPagination(testContext, nextCursor, 10000) if err != nil { panic(err) } @@ -111,7 +112,7 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti proxiedRequestMetrics = append(proxiedRequestMetrics, proxiedRequestMetricsPage...) } - var requestMetricsDuringRequestWindow []database.ProxiedRequestMetric + var requestMetricsDuringRequestWindow []*database.ProxiedRequestMetric // iterate in reverse order to start checking the most recent request metrics first for i := len(proxiedRequestMetrics) - 1; i >= 0; i-- { requestMetric := proxiedRequestMetrics[i] @@ -143,10 +144,10 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti func waitForMetricsInWindow( t *testing.T, expected int, - db database.PostgresClient, + db postgres.Client, startTime time.Time, testedmethods []string, -) (metrics []database.ProxiedRequestMetric) { +) (metrics []*database.ProxiedRequestMetric) { timeoutMin := 2 * time.Second // scale the timeout by the number of expected requests, or at least 1 second timeout := time.Duration(expected+1)*100*time.Millisecond + time.Second @@ -202,7 +203,7 @@ func TestE2ETestProxyCreatesRequestMetricForEachRequest(t *testing.T) { require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -238,7 +239,7 @@ func TestE2ETestProxyTracksBlockNumberForEth_getBlockByNumberRequest(t *testing. require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -281,7 +282,7 @@ func TestE2ETestProxyTracksBlockTagForEth_getBlockByNumberRequest(t *testing.T) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -319,7 +320,7 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockNumberParam(t *testing. client, err := ethclient.Dial(proxyServiceURL) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) // get the latest queryable block number @@ -385,7 +386,7 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockHashParam(t *testing.T) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) @@ -438,7 +439,7 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) { rpc, err := rpc.Dial(proxyServiceURL) require.NoError(t, err) - databaseClient, err := database.NewPostgresClient(databaseConfig) + databaseClient, err := postgres.NewClient(databaseConfig) require.NoError(t, err) testCases := []struct { @@ -676,7 +677,7 @@ func containsHeaders(t *testing.T, headersMap1, headersMap2 http.Header, msg str func TestE2ETestCachingMdwWithBlockNumberParam_Metrics(t *testing.T) { client, err := ethclient.Dial(proxyServiceURL) require.NoError(t, err) - db, err := database.NewPostgresClient(databaseConfig) + db, err := postgres.NewClient(databaseConfig) require.NoError(t, err) redisClient := redis.NewClient(&redis.Options{ @@ -1216,7 +1217,7 @@ func cleanUpRedis(t *testing.T, redisClient *redis.Client) { } } -func cleanMetricsDb(t *testing.T, db database.PostgresClient) { +func cleanMetricsDb(t *testing.T, db postgres.Client) { if shouldSkipMetrics() { return } diff --git a/routines/metric_compaction.go b/routines/metric_compaction.go index 5a2d55a..d949bae 100644 --- a/routines/metric_compaction.go +++ b/routines/metric_compaction.go @@ -5,10 +5,10 @@ package routines import ( "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" "time" "github.com/google/uuid" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/logging" ) @@ -16,7 +16,7 @@ import ( // for creating a new metric compaction routine type MetricCompactionRoutineConfig struct { Interval time.Duration - Database *database.PostgresClient + Database database.MetricsDatabase Logger logging.ServiceLogger } @@ -26,7 +26,7 @@ type MetricCompactionRoutineConfig struct { type MetricCompactionRoutine struct { id string interval time.Duration - *database.PostgresClient + db database.MetricsDatabase logging.ServiceLogger } @@ -52,9 +52,9 @@ func (mcr *MetricCompactionRoutine) Run() (<-chan error, error) { // using the provided config, returning the routine and error (if any) func NewMetricCompactionRoutine(config MetricCompactionRoutineConfig) (*MetricCompactionRoutine, error) { return &MetricCompactionRoutine{ - id: uuid.New().String(), - interval: config.Interval, - PostgresClient: config.Database, - ServiceLogger: config.Logger, + id: uuid.New().String(), + interval: config.Interval, + db: config.Database, + ServiceLogger: config.Logger, }, nil } diff --git a/routines/metric_partitioning.go b/routines/metric_partitioning.go index 9d96026..ec62da8 100644 --- a/routines/metric_partitioning.go +++ b/routines/metric_partitioning.go @@ -1,29 +1,21 @@ package routines import ( - "context" "fmt" - "math" - "strings" + "github.com/kava-labs/kava-proxy-service/clients/database" "time" "github.com/google/uuid" - "github.com/kava-labs/kava-proxy-service/clients/database" - "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/logging" ) -const ( - PartitionBaseTableName = "proxied_request_metrics" -) - // MetricPartitioningRoutineConfig wraps values used // for creating a new metric partitioning routine type MetricPartitioningRoutineConfig struct { Interval time.Duration DelayFirstRun time.Duration PrefillPeriodDays int - Database *database.PostgresClient + Database database.MetricsDatabase Logger logging.ServiceLogger } @@ -35,7 +27,7 @@ type MetricPartitioningRoutine struct { interval time.Duration delayFirstRun time.Duration prefillPeriodDays int - *database.PostgresClient + db database.MetricsDatabase logging.ServiceLogger } @@ -49,8 +41,7 @@ func (mcr *MetricPartitioningRoutine) Run() (<-chan error, error) { time.Sleep(mcr.delayFirstRun) - err := mcr.partition() - + err := mcr.db.Partition(mcr.prefillPeriodDays) if err != nil { errorChannel <- err } @@ -62,7 +53,7 @@ func (mcr *MetricPartitioningRoutine) Run() (<-chan error, error) { for tick := range timer { mcr.Trace().Msg(fmt.Sprintf("%s tick at %+v", mcr.id, tick)) - err := mcr.partition() + err := mcr.db.Partition(mcr.prefillPeriodDays) if err != nil { errorChannel <- err @@ -81,253 +72,7 @@ func NewMetricPartitioningRoutine(config MetricPartitioningRoutineConfig) (*Metr interval: config.Interval, delayFirstRun: config.DelayFirstRun, prefillPeriodDays: config.PrefillPeriodDays, - PostgresClient: config.Database, + db: config.Database, ServiceLogger: config.Logger, }, nil } - -// PartitionPeriod represents a single postgres partitioned -// table from a starting point (inclusive of that point in time) -// to an end point (exclusive of that point in time) -type PartitionPeriod struct { - TableName string - InclusiveStartPeriod time.Time - ExclusiveEndPeriod time.Time -} - -// daysInMonth returns the number of days in a month -func daysInMonth(t time.Time) int { - y, m, _ := t.Date() - return time.Date(y, m+1, 0, 0, 0, 0, 0, time.UTC).Day() -} - -// partitionsForPeriod attempts to generate the partitions -// to create when prefilling numDaysToPrefill, returning the list of -// of partitions and error (if any) -func partitionsForPeriod(start time.Time, numDaysToPrefill int) ([]PartitionPeriod, error) { - var partitionPeriods []PartitionPeriod - // check function constraints needed to ensure expected behavior - if numDaysToPrefill > config.MaxMetricPartitioningPrefillPeriodDays { - return partitionPeriods, fmt.Errorf("more than %d prefill days specified %d", config.MaxMetricPartitioningPrefillPeriodDays, numDaysToPrefill) - } - - currentYear, currentMonth, currentDay := start.Date() - - daysInCurrentMonth := daysInMonth(start) - - // add one to include the current day - newDaysRemainingInCurrentMonth := daysInCurrentMonth - currentDay + 1 - - // generate partitions for current month - totalPartitionsToGenerate := numDaysToPrefill - - partitionsToGenerateForCurrentMonth := int(math.Min(float64(newDaysRemainingInCurrentMonth), float64(numDaysToPrefill))) - - // generate partitions for current month - for partitionIndex := 0; partitionsToGenerateForCurrentMonth > 0; partitionIndex++ { - partitionPeriod := PartitionPeriod{ - TableName: fmt.Sprintf("%s_year_%d_month_%d_day_%d", PartitionBaseTableName, currentYear, currentMonth, currentDay+partitionIndex), - InclusiveStartPeriod: start.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), - ExclusiveEndPeriod: start.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), - } - - partitionPeriods = append(partitionPeriods, partitionPeriod) - - partitionsToGenerateForCurrentMonth-- - } - - // check to see if we need to create any partitions for the - // upcoming month - if totalPartitionsToGenerate > newDaysRemainingInCurrentMonth { - futureMonth := start.Add(time.Hour * 24 * time.Duration(newDaysRemainingInCurrentMonth+1)) - - nextYear, nextMonth, nextDay := futureMonth.Date() - - // on function entry we assert that pre-fill days won't - // overflow more than two unique months - // to generate partitions for - partitionsToGenerateForFutureMonth := totalPartitionsToGenerate - newDaysRemainingInCurrentMonth - - // generate partitions for future month - for partitionIndex := 0; partitionsToGenerateForFutureMonth > 0; partitionIndex++ { - partitionPeriod := PartitionPeriod{ - TableName: fmt.Sprintf("%s_year%d_month%d_day%d", PartitionBaseTableName, nextYear, nextMonth, nextDay+partitionIndex), - InclusiveStartPeriod: futureMonth.Add(time.Duration(partitionIndex) * 24 * time.Hour).Truncate(24 * time.Hour), - ExclusiveEndPeriod: futureMonth.Add(time.Duration(partitionIndex+1) * 24 * time.Hour).Truncate(24 * time.Hour), - } - - partitionPeriods = append(partitionPeriods, partitionPeriod) - - partitionsToGenerateForFutureMonth-- - } - } - - return partitionPeriods, nil -} - -// partition attempts to create (idempotently) future partitions -// for storing proxied request metrics, returning error (if any) -func (mcr *MetricPartitioningRoutine) partition() error { - // calculate partition name and ranges to create - partitionsToCreate, err := partitionsForPeriod(time.Now(), mcr.prefillPeriodDays) - - if err != nil { - return err - } - - mcr.Trace().Msg(fmt.Sprintf("partitionsToCreate %+v", partitionsToCreate)) - - // create partition for each of those days - for _, partitionToCreate := range partitionsToCreate { - // do below in a transaction to allow retries - // each run of the routine to smooth any over transient issues - // such as dropped database connection or rolling service updates - // and support safe concurrency of multiple instances of the service - // attempting to create partitions - // https://go.dev/doc/database/execute-transactions - tx, err := mcr.DB.BeginTx(context.Background(), nil) - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s beginning transaction for partition %+v", err, partitionToCreate)) - - continue - } - - // check to see if partition already exists - _, err = tx.Exec(fmt.Sprintf("select * from %s limit 1;", partitionToCreate.TableName)) - - if err != nil { - if !strings.Contains(err.Error(), "42P01") { - mcr.Error().Msg(fmt.Sprintf("error %s querying for partition %+v", err, partitionToCreate)) - - tx.Rollback() - - continue - } - - // else error indicates table doesn't exist so safe for us to create it - createTableStatement := fmt.Sprintf(` - CREATE TABLE IF NOT EXISTS %s - (LIKE proxied_request_metrics INCLUDING DEFAULTS INCLUDING CONSTRAINTS); - `, partitionToCreate.TableName) - _, err = mcr.DB.Exec(createTableStatement) - - if err != nil { - mcr.Debug().Msg(fmt.Sprintf("error %s creating partition %+v using statement %s", err, partitionToCreate, createTableStatement)) - - err = tx.Rollback() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, createTableStatement)) - } - - continue - } - - // attach partitions to main table - attachPartitionStatement := fmt.Sprintf(` - ALTER TABLE proxied_request_metrics ATTACH PARTITION %s - FOR VALUES FROM ('%s') TO ('%s'); - `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) - _, err = mcr.DB.Exec(attachPartitionStatement) - - if err != nil { - mcr.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, - partitionToCreate, attachPartitionStatement)) - - err = tx.Rollback() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) - } - - continue - } - - err = tx.Commit() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) - - continue - } - - mcr.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) - - continue - } else { - // table exists, no need to create it - mcr.Trace().Msg(fmt.Sprintf("not creating table for partition %+v as it already exists", partitionToCreate)) - - // but check if it is attached - partitionIsAttachedQuery := fmt.Sprintf(` - SELECT - nmsp_parent.nspname AS parent_schema, - parent.relname AS parent, - nmsp_child.nspname AS child_schema, - child.relname AS child - FROM pg_inherits - JOIN pg_class parent ON pg_inherits.inhparent = parent.oid - JOIN pg_class child ON pg_inherits.inhrelid = child.oid - JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace - JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace - WHERE parent.relname='proxied_request_metrics' and child.relname='%s';`, partitionToCreate.TableName) - result, err := mcr.DB.Query(partitionIsAttachedQuery) - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s querying %s to see if partition %+v is already attached", err, partitionIsAttachedQuery, partitionToCreate)) - - continue - } - - if !result.Next() { - mcr.Trace().Msg(fmt.Sprintf("attaching created but dangling partition %+v", partitionToCreate)) - // table is not attached, attach it - attachPartitionStatement := fmt.Sprintf(` - ALTER TABLE proxied_request_metrics ATTACH PARTITION %s - FOR VALUES FROM ('%s') TO ('%s'); - `, partitionToCreate.TableName, partitionToCreate.InclusiveStartPeriod.Format("2006-01-02 15:04:05"), partitionToCreate.ExclusiveEndPeriod.Format("2006-01-02 15:04:05")) - _, err = mcr.DB.Exec(attachPartitionStatement) - - if err != nil { - mcr.Debug().Msg(fmt.Sprintf("error %s attaching partition %+v using statement %s", err, - partitionToCreate, attachPartitionStatement)) - - err = tx.Rollback() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s rolling back statement %s", err, attachPartitionStatement)) - } - - continue - } - - err = tx.Commit() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s committing transaction to create partition %+v", err, partitionToCreate)) - - continue - } - - mcr.Trace().Msg(fmt.Sprintf("created partition %+v", partitionToCreate)) - - continue - } - - result.Close() - - mcr.Trace().Msg(fmt.Sprintf("not attaching partition %+v as it is already attached", partitionToCreate)) - - err = tx.Commit() - - if err != nil { - mcr.Error().Msg(fmt.Sprintf("error %s committing empty transaction for already created partition %+v", err, partitionToCreate)) - } - - continue - } - } - - return nil -} diff --git a/routines/metric_partitioning_test.go b/routines/metric_partitioning_test.go index a4400f2..439e103 100644 --- a/routines/metric_partitioning_test.go +++ b/routines/metric_partitioning_test.go @@ -2,13 +2,13 @@ package routines import ( "context" - "os" - "testing" - "time" - + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/service" "github.com/stretchr/testify/assert" + "os" + "testing" + "time" ) var ( @@ -38,7 +38,7 @@ func TestE2ETestMetricPartitioningRoutinePrefillsExpectedPartitionsAfterStartupD // prepare time.Sleep(time.Duration(MetricPartitioningRoutineDelayFirstRunSeconds) * time.Second) - expectedPartitions, err := partitionsForPeriod(time.Now().UTC(), int(configuredPrefillDays)) + expectedPartitions, err := postgres.PartitionsForPeriod(time.Now().UTC(), int(configuredPrefillDays)) assert.Nil(t, err) @@ -51,53 +51,6 @@ func TestE2ETestMetricPartitioningRoutinePrefillsExpectedPartitionsAfterStartupD assert.Equal(t, expectedPartitions[len(expectedPartitions)-1].TableName, databaseStatus.LatestProxiedRequestMetricPartitionTableName) } -func TestUnitTestpartitionsForPeriodReturnsErrWhenTooManyPrefillDays(t *testing.T) { - // prepare - daysToPrefill := config.MaxMetricPartitioningPrefillPeriodDays + 1 - - // execute - _, err := partitionsForPeriod(time.Now(), daysToPrefill) - - // assert - assert.NotNil(t, err) -} - -func TestUnitTestpartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsContainedInCurrentMonth(t *testing.T) { - // prepare - - // pick a date in the middle of a month - startFrom := time.Date(1989, 5, 11, 12, 0, 0, 0, time.UTC) - - // set prefill period to less then days remaining in month - // from above date - daysToPrefill := 3 - - // execute - actualPartitionsForPeriod, err := partitionsForPeriod(startFrom, daysToPrefill) - - // assert - assert.Nil(t, err) - assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) -} - -func TestUnitTestpartitionsForPeriodReturnsExpectedNumPartitionsWhenPrefillPeriodIsNotContainedInCurrentMonth(t *testing.T) { - // prepare - - // pick a date in the middle of a month - startFrom := time.Date(1989, 5, 20, 12, 0, 0, 0, time.UTC) - - // set prefill period to more then days remaining in month - // from above date - daysToPrefill := 21 - - // execute - actualPartitionsForPeriod, err := partitionsForPeriod(startFrom, daysToPrefill) - - // assert - assert.Nil(t, err) - assert.Equal(t, daysToPrefill, len(actualPartitionsForPeriod)) -} - func shouldSkipMetrics() bool { // Check if the environment variable SKIP_METRICS is set to "true" return os.Getenv("SKIP_METRICS") == "true" diff --git a/routines/metric_pruning.go b/routines/metric_pruning.go index 6586c7e..5110ea8 100644 --- a/routines/metric_pruning.go +++ b/routines/metric_pruning.go @@ -6,10 +6,10 @@ package routines import ( "context" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" "time" "github.com/google/uuid" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/logging" ) @@ -19,7 +19,7 @@ type MetricPruningRoutineConfig struct { Interval time.Duration StartDelay time.Duration MaxRequestMetricsHistoryDays int64 - Database *database.PostgresClient + Database database.MetricsDatabase Logger logging.ServiceLogger } @@ -31,7 +31,7 @@ type MetricPruningRoutine struct { interval time.Duration startDelay time.Duration maxRequestMetricsHistoryDays int64 - *database.PostgresClient + Database database.MetricsDatabase logging.ServiceLogger } @@ -50,7 +50,7 @@ func (mpr *MetricPruningRoutine) Run() (<-chan error, error) { for tick := range timer { mpr.Trace().Msg(fmt.Sprintf("%s tick at %+v", mpr.id, tick)) - database.DeleteProxiedRequestMetricsOlderThanNDays(context.Background(), mpr.DB, mpr.maxRequestMetricsHistoryDays) + mpr.Database.DeleteProxiedRequestMetricsOlderThanNDays(context.Background(), mpr.maxRequestMetricsHistoryDays) } }() @@ -65,7 +65,7 @@ func NewMetricPruningRoutine(config MetricPruningRoutineConfig) (*MetricPruningR interval: config.Interval, startDelay: config.StartDelay, maxRequestMetricsHistoryDays: config.MaxRequestMetricsHistoryDays, - PostgresClient: config.Database, + Database: config.Database, ServiceLogger: config.Logger, }, nil } diff --git a/service/handlers.go b/service/handlers.go index b7aa38c..91787e9 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -5,9 +5,8 @@ import ( "encoding/json" "errors" "fmt" - "net/http" - "github.com/kava-labs/kava-proxy-service/clients/database" + "net/http" ) // createHealthcheckHandler creates a health check handler function that @@ -68,11 +67,11 @@ func createServicecheckHandler(service *ProxyService) func(http.ResponseWriter, // function responding to requests for the status of database related // operations such as proxied request metrics compaction and // partitioning -func createDatabaseStatusHandler(service *ProxyService, db *database.PostgresClient) func(http.ResponseWriter, *http.Request) { +func createDatabaseStatusHandler(service *ProxyService, db database.MetricsDatabase) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { service.Debug().Msg("/database/status called") - proxiedRequestMetricPartitionsCount, err := database.CountAttachedProxiedRequestMetricPartitions(r.Context(), db.DB) + proxiedRequestMetricPartitionsCount, err := db.CountAttachedProxiedRequestMetricPartitions(r.Context()) if err != nil { service.Error().Msg(fmt.Sprintf("error %s getting proxiedRequestMetricPartitionsCount", err)) @@ -80,7 +79,7 @@ func createDatabaseStatusHandler(service *ProxyService, db *database.PostgresCli return } - proxiedRequestMetricLatestAttachedPartitionName, err := database.GetLastCreatedAttachedProxiedRequestMetricsPartitionName(r.Context(), db.DB) + proxiedRequestMetricLatestAttachedPartitionName, err := db.GetLastCreatedAttachedProxiedRequestMetricsPartitionName(r.Context()) if err != nil { service.Error().Msg(fmt.Sprintf("error %s getting proxiedRequestMetricPartitionsCount", err)) diff --git a/service/middleware.go b/service/middleware.go index 8df5d9b..2d0cd52 100644 --- a/service/middleware.go +++ b/service/middleware.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" "io" "net/http" "strings" @@ -11,7 +12,6 @@ import ( "github.com/urfave/negroni" - "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" @@ -492,7 +492,7 @@ func createAfterProxyFinalizer(service *ProxyService, config config.Config) http isCached := cachemdw.IsRequestCached(r.Context()) // create a metric for the request - metric := database.ProxiedRequestMetric{ + metric := &database.ProxiedRequestMetric{ MethodName: decodedRequestBody.Method, ResponseLatencyMilliseconds: originRoundtripLatencyMilliseconds, RequestTime: requestStartTime, @@ -511,7 +511,7 @@ func createAfterProxyFinalizer(service *ProxyService, config config.Config) http // save metric to database async go func() { // using background context so save won't be terminated when request finishes - err = metric.Save(context.Background(), service.Database.DB) + err = service.Database.SaveProxiedRequestMetric(context.Background(), metric) if err != nil { // TODO: consider only logging diff --git a/service/service.go b/service/service.go index fc0a20a..653fcf3 100644 --- a/service/service.go +++ b/service/service.go @@ -5,14 +5,16 @@ package service import ( "context" "fmt" + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/kava-labs/kava-proxy-service/clients/database/empty" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres" + "github.com/kava-labs/kava-proxy-service/clients/database/postgres/migrations" "net/http" "time" "github.com/ethereum/go-ethereum/ethclient" "github.com/kava-labs/kava-proxy-service/clients/cache" - "github.com/kava-labs/kava-proxy-service/clients/database" - "github.com/kava-labs/kava-proxy-service/clients/database/migrations" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/logging" "github.com/kava-labs/kava-proxy-service/service/batchmdw" @@ -21,7 +23,7 @@ import ( // ProxyService represents an instance of the proxy service API type ProxyService struct { - Database *database.PostgresClient + Database database.MetricsDatabase Cache *cachemdw.ServiceCache httpProxy *http.Server evmClient *ethclient.Client @@ -32,10 +34,19 @@ type ProxyService struct { func New(ctx context.Context, config config.Config, serviceLogger *logging.ServiceLogger) (ProxyService, error) { service := ProxyService{} - // create database client - db, err := createDatabaseClient(ctx, config, serviceLogger) - if err != nil { - return ProxyService{}, err + var ( + db database.MetricsDatabase + err error + ) + + if config.MetricDatabaseEnabled { + // create database client + db, err = createDatabaseClient(ctx, config, serviceLogger) + if err != nil { + return ProxyService{}, err + } + } else { + db = empty.New() } // create evm api client @@ -140,9 +151,8 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // using the specified config and runs migrations async // (only if migration flag in config is true) // returning the database connection and error (if any) -func createDatabaseClient(ctx context.Context, config config.Config, logger *logging.ServiceLogger) (*database.PostgresClient, error) { - databaseConfig := database.PostgresDatabaseConfig{ - DatabaseDisabled: !config.MetricDatabaseEnabled, +func createDatabaseClient(ctx context.Context, config config.Config, logger *logging.ServiceLogger) (*postgres.Client, error) { + databaseConfig := postgres.DatabaseConfig{ DatabaseName: config.DatabaseName, DatabaseEndpointURL: config.DatabaseEndpointURL, DatabaseUsername: config.DatabaseUserName, @@ -158,12 +168,12 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log RunDatabaseMigrations: config.RunDatabaseMigrations, } - serviceDatabase, err := database.NewPostgresClient(databaseConfig) + serviceDatabase, err := postgres.NewClient(databaseConfig) if err != nil { logger.Error().Msg(fmt.Sprintf("error %s creating database using config %+v", err, databaseConfig)) - return &database.PostgresClient{}, err + return &postgres.Client{}, err } if !databaseConfig.RunDatabaseMigrations { @@ -196,7 +206,7 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log logger.Debug().Msg("running migrations on database") - migrations, err := database.Migrate(ctx, serviceDatabase.DB, *migrations.Migrations, logger) + migrations, err := serviceDatabase.Migrate(ctx, *migrations.Migrations, logger) if err != nil { // TODO: retry based on config