From 79316b09f07bbdc5e0a6e52a99071e9fe69ea8dc Mon Sep 17 00:00:00 2001 From: Ada Lundhe Date: Thu, 31 Oct 2024 13:56:07 -0500 Subject: [PATCH 1/4] AL: refactor tests and add sqlite --- impl/config/config.go | 2 +- impl/config/config.toml | 2 +- impl/go.mod | 1 + impl/go.sum | 1 + impl/pkg/server/server_test.go | 2 +- impl/pkg/service/dht_test.go | 2 +- impl/pkg/storage/db/postgres/db.go | 2 +- impl/pkg/storage/db/postgres/models.go | 2 +- impl/pkg/storage/db/postgres/queries.sql.go | 2 +- impl/pkg/storage/db/sqlite/db.go | 31 +++ .../00001_create_dht_records_table.sql | 17 ++ impl/pkg/storage/db/sqlite/models.go | 18 ++ impl/pkg/storage/db/sqlite/queries.sql.go | 184 +++++++++++++ .../pkg/storage/db/sqlite/queries/queries.sql | 25 ++ impl/pkg/storage/db/sqlite/sqlite.go | 249 ++++++++++++++++++ impl/pkg/storage/db/sqlite/sqlite_test.go | 141 ++++++++++ impl/pkg/storage/storage.go | 9 +- impl/pkg/storage/storage_test.go | 19 +- impl/sqlc.yaml | 9 +- 19 files changed, 702 insertions(+), 16 deletions(-) create mode 100644 impl/pkg/storage/db/sqlite/db.go create mode 100644 impl/pkg/storage/db/sqlite/migrations/00001_create_dht_records_table.sql create mode 100644 impl/pkg/storage/db/sqlite/models.go create mode 100644 impl/pkg/storage/db/sqlite/queries.sql.go create mode 100644 impl/pkg/storage/db/sqlite/queries/queries.sql create mode 100644 impl/pkg/storage/db/sqlite/sqlite.go create mode 100644 impl/pkg/storage/db/sqlite/sqlite_test.go diff --git a/impl/config/config.go b/impl/config/config.go index b2f9d5a2..b6d07681 100644 --- a/impl/config/config.go +++ b/impl/config/config.go @@ -73,7 +73,7 @@ func GetDefaultConfig() Config { APIHost: "0.0.0.0", APIPort: 8305, BaseURL: "http://localhost:8305", - StorageURI: "bolt://diddht.db", + StorageURI: "sqlite://diddht.db", Telemetry: false, }, DHTConfig: DHTServiceConfig{ diff --git a/impl/config/config.toml b/impl/config/config.toml index b48239a6..5dc1d96f 100644 --- a/impl/config/config.toml +++ b/impl/config/config.toml @@ -3,7 +3,7 @@ env = "dev" api_host = "0.0.0.0" api_port = 8305 log_level = "debug" -storage_uri = "bolt://diddht.db" +storage_uri = "sqlite://diddht.db" telemetry = false [dht] diff --git a/impl/go.mod b/impl/go.mod index dc08edca..e65b0a81 100644 --- a/impl/go.mod +++ b/impl/go.mod @@ -17,6 +17,7 @@ require ( github.com/joho/godotenv v1.5.1 github.com/lestrrat-go/jwx/v2 v2.1.2 github.com/magefile/mage v1.15.0 + github.com/mattn/go-sqlite3 v1.14.15 github.com/miekg/dns v1.1.62 github.com/mitchellh/go-homedir v1.1.0 github.com/mr-tron/base58 v1.2.0 diff --git a/impl/go.sum b/impl/go.sum index a35abecc..fe5b07bf 100644 --- a/impl/go.sum +++ b/impl/go.sum @@ -309,6 +309,7 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mfridman/interpolate v0.0.2 h1:pnuTK7MQIxxFz1Gr+rjSIx9u7qVjf5VOoM/u6BbAxPY= diff --git a/impl/pkg/server/server_test.go b/impl/pkg/server/server_test.go index a902e5d5..47dbd01e 100644 --- a/impl/pkg/server/server_test.go +++ b/impl/pkg/server/server_test.go @@ -22,7 +22,7 @@ const ( func TestHealthCheckAPI(t *testing.T) { shutdown := make(chan os.Signal, 1) serviceConfig, err := config.LoadConfig("") - serviceConfig.ServerConfig.StorageURI = "bolt://health-check.db" + serviceConfig.ServerConfig.StorageURI = "sqlite://health-check.db" serviceConfig.ServerConfig.BaseURL = testServerURL assert.NoError(t, err) diff --git a/impl/pkg/service/dht_test.go b/impl/pkg/service/dht_test.go index 8ef5ebbf..55a86a72 100644 --- a/impl/pkg/service/dht_test.go +++ b/impl/pkg/service/dht_test.go @@ -209,7 +209,7 @@ func TestNoConfig(t *testing.T) { func newDHTService(t *testing.T, id string, bootstrapPeers ...anacrolixdht.Addr) DHTService { defaultConfig := config.GetDefaultConfig() - db, err := storage.NewStorage(fmt.Sprintf("bolt://diddht-test-%s.db", id)) + db, err := storage.NewStorage(fmt.Sprintf("sqlite://diddht-test-%s.db", id)) require.NoError(t, err) require.NotEmpty(t, db) diff --git a/impl/pkg/storage/db/postgres/db.go b/impl/pkg/storage/db/postgres/db.go index 0c307ea8..46317d30 100644 --- a/impl/pkg/storage/db/postgres/db.go +++ b/impl/pkg/storage/db/postgres/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.27.0 package postgres diff --git a/impl/pkg/storage/db/postgres/models.go b/impl/pkg/storage/db/postgres/models.go index a4d02233..751b76d8 100644 --- a/impl/pkg/storage/db/postgres/models.go +++ b/impl/pkg/storage/db/postgres/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.27.0 package postgres diff --git a/impl/pkg/storage/db/postgres/queries.sql.go b/impl/pkg/storage/db/postgres/queries.sql.go index 9fdfde12..4ae29ff8 100644 --- a/impl/pkg/storage/db/postgres/queries.sql.go +++ b/impl/pkg/storage/db/postgres/queries.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.27.0 // source: queries.sql package postgres diff --git a/impl/pkg/storage/db/sqlite/db.go b/impl/pkg/storage/db/sqlite/db.go new file mode 100644 index 00000000..daca62a3 --- /dev/null +++ b/impl/pkg/storage/db/sqlite/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sqlite + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/impl/pkg/storage/db/sqlite/migrations/00001_create_dht_records_table.sql b/impl/pkg/storage/db/sqlite/migrations/00001_create_dht_records_table.sql new file mode 100644 index 00000000..3627c017 --- /dev/null +++ b/impl/pkg/storage/db/sqlite/migrations/00001_create_dht_records_table.sql @@ -0,0 +1,17 @@ +-- +goose Up +CREATE TABLE dht_records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key BLOB UNIQUE NOT NULL, + value BLOB NOT NULL, + sig BLOB NOT NULL, + seq INTEGER NOT NULL +); + +CREATE TABLE failed_records ( + id BLOB PRIMARY KEY, + failure_count INTEGER NOT NULL +); + +-- +goose Down +DROP TABLE failed_records; +DROP TABLE dht_records; \ No newline at end of file diff --git a/impl/pkg/storage/db/sqlite/models.go b/impl/pkg/storage/db/sqlite/models.go new file mode 100644 index 00000000..f66222b1 --- /dev/null +++ b/impl/pkg/storage/db/sqlite/models.go @@ -0,0 +1,18 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 + +package sqlite + +type DhtRecord struct { + ID int64 + Key []byte + Value []byte + Sig []byte + Seq int64 +} + +type FailedRecord struct { + ID []byte + FailureCount int64 +} diff --git a/impl/pkg/storage/db/sqlite/queries.sql.go b/impl/pkg/storage/db/sqlite/queries.sql.go new file mode 100644 index 00000000..66ffa805 --- /dev/null +++ b/impl/pkg/storage/db/sqlite/queries.sql.go @@ -0,0 +1,184 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.27.0 +// source: queries.sql + +package sqlite + +import ( + "context" +) + +const failedRecordCount = `-- name: FailedRecordCount :one +SELECT count(*) AS exact_count FROM failed_records +` + +func (q *Queries) FailedRecordCount(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, failedRecordCount) + var exact_count int64 + err := row.Scan(&exact_count) + return exact_count, err +} + +const listFailedRecords = `-- name: ListFailedRecords :many +SELECT id, failure_count FROM failed_records +` + +func (q *Queries) ListFailedRecords(ctx context.Context) ([]FailedRecord, error) { + rows, err := q.db.QueryContext(ctx, listFailedRecords) + if err != nil { + return nil, err + } + defer rows.Close() + var items []FailedRecord + for rows.Next() { + var i FailedRecord + if err := rows.Scan(&i.ID, &i.FailureCount); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listRecords = `-- name: ListRecords :many +SELECT id, "key", value, sig, seq FROM dht_records WHERE id > (SELECT id FROM dht_records WHERE dht_records.key = ?) ORDER BY id ASC LIMIT ? +` + +type ListRecordsParams struct { + Key []byte + Limit int64 +} + +func (q *Queries) ListRecords(ctx context.Context, arg ListRecordsParams) ([]DhtRecord, error) { + rows, err := q.db.QueryContext(ctx, listRecords, arg.Key, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []DhtRecord + for rows.Next() { + var i DhtRecord + if err := rows.Scan( + &i.ID, + &i.Key, + &i.Value, + &i.Sig, + &i.Seq, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listRecordsFirstPage = `-- name: ListRecordsFirstPage :many +SELECT id, "key", value, sig, seq FROM dht_records ORDER BY id ASC LIMIT ? +` + +func (q *Queries) ListRecordsFirstPage(ctx context.Context, limit int64) ([]DhtRecord, error) { + rows, err := q.db.QueryContext(ctx, listRecordsFirstPage, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []DhtRecord + for rows.Next() { + var i DhtRecord + if err := rows.Scan( + &i.ID, + &i.Key, + &i.Value, + &i.Sig, + &i.Seq, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const readRecord = `-- name: ReadRecord :one +SELECT id, "key", value, sig, seq FROM dht_records WHERE key = ? LIMIT 1 +` + +func (q *Queries) ReadRecord(ctx context.Context, key []byte) (DhtRecord, error) { + row := q.db.QueryRowContext(ctx, readRecord, key) + var i DhtRecord + err := row.Scan( + &i.ID, + &i.Key, + &i.Value, + &i.Sig, + &i.Seq, + ) + return i, err +} + +const recordCount = `-- name: RecordCount :one +SELECT count(*) AS exact_count FROM dht_records +` + +func (q *Queries) RecordCount(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, recordCount) + var exact_count int64 + err := row.Scan(&exact_count) + return exact_count, err +} + +const writeFailedRecord = `-- name: WriteFailedRecord :exec +INSERT INTO failed_records(id, failure_count) +VALUES(?, ?) +ON CONFLICT (id) DO UPDATE SET failure_count = failed_records.failure_count + 1 +` + +type WriteFailedRecordParams struct { + ID []byte + FailureCount int64 +} + +func (q *Queries) WriteFailedRecord(ctx context.Context, arg WriteFailedRecordParams) error { + _, err := q.db.ExecContext(ctx, writeFailedRecord, arg.ID, arg.FailureCount) + return err +} + +const writeRecord = `-- name: WriteRecord :exec +INSERT INTO dht_records(key, value, sig, seq) VALUES(?, ?, ?, ?) +` + +type WriteRecordParams struct { + Key []byte + Value []byte + Sig []byte + Seq int64 +} + +func (q *Queries) WriteRecord(ctx context.Context, arg WriteRecordParams) error { + _, err := q.db.ExecContext(ctx, writeRecord, + arg.Key, + arg.Value, + arg.Sig, + arg.Seq, + ) + return err +} diff --git a/impl/pkg/storage/db/sqlite/queries/queries.sql b/impl/pkg/storage/db/sqlite/queries/queries.sql new file mode 100644 index 00000000..53bf81c0 --- /dev/null +++ b/impl/pkg/storage/db/sqlite/queries/queries.sql @@ -0,0 +1,25 @@ +-- name: WriteRecord :exec +INSERT INTO dht_records(key, value, sig, seq) VALUES(?, ?, ?, ?); + +-- name: ReadRecord :one +SELECT * FROM dht_records WHERE key = ? LIMIT 1; + +-- name: ListRecords :many +SELECT * FROM dht_records WHERE id > (SELECT id FROM dht_records WHERE dht_records.key = ?) ORDER BY id ASC LIMIT ?; + +-- name: ListRecordsFirstPage :many +SELECT * FROM dht_records ORDER BY id ASC LIMIT ?; + +-- name: RecordCount :one +SELECT count(*) AS exact_count FROM dht_records; + +-- name: WriteFailedRecord :exec +INSERT INTO failed_records(id, failure_count) +VALUES(?, ?) +ON CONFLICT (id) DO UPDATE SET failure_count = failed_records.failure_count + 1; + +-- name: ListFailedRecords :many +SELECT * FROM failed_records; + +-- name: FailedRecordCount :one +SELECT count(*) AS exact_count FROM failed_records; \ No newline at end of file diff --git a/impl/pkg/storage/db/sqlite/sqlite.go b/impl/pkg/storage/db/sqlite/sqlite.go new file mode 100644 index 00000000..796a3641 --- /dev/null +++ b/impl/pkg/storage/db/sqlite/sqlite.go @@ -0,0 +1,249 @@ +package sqlite + +import ( + "context" + "database/sql" + "embed" + "fmt" + + _ "github.com/mattn/go-sqlite3" + + "github.com/pressly/goose/v3" + "github.com/sirupsen/logrus" + "github.com/tv42/zbase32" + + "github.com/TBD54566975/did-dht/pkg/dht" + "github.com/TBD54566975/did-dht/pkg/telemetry" +) + +//go:embed migrations +var migrations embed.FS + +type SQLite string + +// NewSQLite creates a SQLite-based implementation of storage.Storage +func NewSQLite(uri string) (SQLite, error) { + db := SQLite(uri) + if err := db.migrate(); err != nil { + return db, fmt.Errorf("error migrating sqlite database: %v", err) + } + + return db, nil +} + +func (s SQLite) migrate() error { + db, err := sql.Open("sqlite3", string(s)) + if err != nil { + return err + } + defer db.Close() + + goose.SetBaseFS(migrations) + if err = goose.SetDialect("sqlite"); err != nil { + return err + } + + if err = goose.Up(db, "migrations"); err != nil { + return err + } + + return nil +} + +func (s SQLite) connect(ctx context.Context) (*Queries, *sql.DB, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.connect") + defer span.End() + + db, err := sql.Open("sqlite3", string(s)) + if err != nil { + return nil, nil, err + } + + return New(db), db, nil +} + +func (s SQLite) WriteRecord(ctx context.Context, record dht.BEP44Record) error { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.WriteRecord") + defer span.End() + + queries, db, err := s.connect(ctx) + if err != nil { + return err + } + defer db.Close() + + err = queries.WriteRecord(ctx, WriteRecordParams{ + Key: record.Key[:], + Value: record.Value[:], + Sig: record.Signature[:], + Seq: record.SequenceNumber, + }) + if err != nil { + return err + } + + return nil +} + +func (s SQLite) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ReadRecord") + defer span.End() + + queries, db, err := s.connect(ctx) + if err != nil { + return nil, err + } + defer db.Close() + + decodedID, err := zbase32.DecodeString(id) + if err != nil { + return nil, err + } + row, err := queries.ReadRecord(ctx, decodedID) + if err != nil { + return nil, err + } + + record, err := row.Record() + if err != nil { + return nil, err + } + + return record, nil +} + +func (s SQLite) ListRecords(ctx context.Context, nextPageToken []byte, limit int) ([]dht.BEP44Record, []byte, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ListRecords") + defer span.End() + + queries, db, err := s.connect(ctx) + if err != nil { + return nil, nil, err + } + defer db.Close() + + var rows []DhtRecord + if nextPageToken == nil { + rows, err = queries.ListRecordsFirstPage(ctx, int64(limit)) + } else { + rows, err = queries.ListRecords(ctx, ListRecordsParams{ + Key: nextPageToken, + Limit: int64(limit), + }) + } + if err != nil { + return nil, nil, err + } + + var records []dht.BEP44Record + for _, row := range rows { + record, err := dht.NewBEP44Record(row.Key, row.Value, row.Sig, row.Seq) + if err != nil { + // TODO: do something useful if this happens + logrus.WithContext(ctx).WithError(err).WithField("record_id", row.ID).Warn("error loading record from database, skipping") + continue + } + + records = append(records, *record) + } + + if len(rows) == limit { + nextPageToken = rows[len(rows)-1].Key + } else { + nextPageToken = nil + } + + return records, nextPageToken, nil +} + +func (row DhtRecord) Record() (*dht.BEP44Record, error) { + return dht.NewBEP44Record(row.Key, row.Value, row.Sig, row.Seq) +} + +func (s SQLite) RecordCount(ctx context.Context) (int, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.RecordCount") + defer span.End() + + queries, db, err := s.connect(ctx) + if err != nil { + return 0, err + } + defer db.Close() + + count, err := queries.RecordCount(ctx) + if err != nil { + return 0, err + } + + return int(count), nil +} + +func (s SQLite) WriteFailedRecord(ctx context.Context, id string) error { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.WriteFailedRecord") + defer span.End() + + queries, db, err := s.connect(ctx) + if err != nil { + return err + } + defer db.Close() + + err = queries.WriteFailedRecord(ctx, WriteFailedRecordParams{ + ID: []byte(id), + FailureCount: 1, + }) + if err != nil { + return err + } + + return nil +} + +func (s SQLite) ListFailedRecords(ctx context.Context) ([]dht.FailedRecord, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ListFailedRecords") + defer span.End() + + queries, db, err := s.connect(ctx) + if err != nil { + return nil, err + } + defer db.Close() + + rows, err := queries.ListFailedRecords(ctx) + if err != nil { + return nil, err + } + + var failedRecords []dht.FailedRecord + for _, row := range rows { + failedRecords = append(failedRecords, dht.FailedRecord{ + ID: string(row.ID), + Count: int(row.FailureCount), + }) + } + + return failedRecords, nil +} + +func (s SQLite) FailedRecordCount(ctx context.Context) (int, error) { + ctx, span := telemetry.GetTracer().Start(ctx, "postgres.FailedRecordCount") + defer span.End() + + queries, db, err := s.connect(ctx) + if err != nil { + return 0, err + } + defer db.Close() + + count, err := queries.FailedRecordCount(ctx) + if err != nil { + return 0, err + } + + return int(count), nil +} + +func (s SQLite) Close() error { + // no-op, sqlite connection is closed after each request + return nil +} diff --git a/impl/pkg/storage/db/sqlite/sqlite_test.go b/impl/pkg/storage/db/sqlite/sqlite_test.go new file mode 100644 index 00000000..208e9452 --- /dev/null +++ b/impl/pkg/storage/db/sqlite/sqlite_test.go @@ -0,0 +1,141 @@ +package sqlite_test + +import ( + "context" + "net/url" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/TBD54566975/did-dht/internal/did" + "github.com/TBD54566975/did-dht/pkg/dht" + "github.com/TBD54566975/did-dht/pkg/storage" + "github.com/TBD54566975/did-dht/pkg/storage/db/sqlite" +) + +func getTestDB(t *testing.T) storage.Storage { + uri := os.Getenv("TEST_DB") + if uri == "" { + t.SkipNow() + } + + u, err := url.Parse(uri) + require.NoError(t, err) + if u.Scheme != "sqlite" { + t.SkipNow() + } + + db, err := sqlite.NewSQLite(uri) + require.NoError(t, err) + + return db +} + +func TestReadWrite(t *testing.T) { + db := getTestDB(t) + ctx := context.Background() + + beforeCnt, err := db.RecordCount(ctx) + require.NoError(t, err) + + // create a did doc as a packet to store + sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{}) + require.NoError(t, err) + require.NotEmpty(t, doc) + + packet, err := did.DHT(doc.ID).ToDNSPacket(*doc, nil, nil, nil) + require.NoError(t, err) + require.NotEmpty(t, packet) + + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) + require.NoError(t, err) + require.NotEmpty(t, putMsg) + + r := dht.RecordFromBEP44(putMsg) + + err = db.WriteRecord(ctx, r) + require.NoError(t, err) + + r2, err := db.ReadRecord(ctx, r.ID()) + require.NoError(t, err) + + assert.Equal(t, r.Key, r2.Key) + assert.Equal(t, r.Value, r2.Value) + assert.Equal(t, r.Signature, r2.Signature) + assert.Equal(t, r.SequenceNumber, r2.SequenceNumber) + + afterCnt, err := db.RecordCount(ctx) + require.NoError(t, err) + assert.Equal(t, beforeCnt+1, afterCnt) +} + +func TestDBPagination(t *testing.T) { + db := getTestDB(t) + defer db.Close() + + ctx := context.Background() + + beforeCnt, err := db.RecordCount(ctx) + require.NoError(t, err) + + preTestRecords, _, err := db.ListRecords(ctx, nil, 10) + require.NoError(t, err) + + // store 10 records + for i := 0; i < 10; i++ { + // create a did doc as a packet to store + sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{}) + require.NoError(t, err) + require.NotEmpty(t, doc) + + packet, err := did.DHT(doc.ID).ToDNSPacket(*doc, nil, nil, nil) + assert.NoError(t, err) + assert.NotEmpty(t, packet) + + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) + require.NoError(t, err) + require.NotEmpty(t, putMsg) + + // create record + record := dht.RecordFromBEP44(putMsg) + + err = db.WriteRecord(ctx, record) + assert.NoError(t, err) + } + + // store 11th document + // create a did doc as a packet to store + sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{}) + require.NoError(t, err) + require.NotEmpty(t, doc) + + packet, err := did.DHT(doc.ID).ToDNSPacket(*doc, nil, nil, nil) + assert.NoError(t, err) + assert.NotEmpty(t, packet) + + putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) + require.NoError(t, err) + require.NotEmpty(t, putMsg) + + // create eleventhRecord + eleventhRecord := dht.RecordFromBEP44(putMsg) + + err = db.WriteRecord(ctx, eleventhRecord) + assert.NoError(t, err) + + // read the first 10 back + page, nextPageToken, err := db.ListRecords(ctx, nil, 10) + assert.NoError(t, err) + assert.Len(t, page, 10) + + page, nextPageToken, err = db.ListRecords(ctx, nextPageToken, 10+len(preTestRecords)) + assert.NoError(t, err) + assert.Nil(t, nextPageToken) + assert.Len(t, page, 1+len(preTestRecords)) + + afterCnt, err := db.RecordCount(ctx) + require.NoError(t, err) + assert.Equal(t, beforeCnt+11, afterCnt) +} diff --git a/impl/pkg/storage/storage.go b/impl/pkg/storage/storage.go index 917aa6d6..0f80597d 100644 --- a/impl/pkg/storage/storage.go +++ b/impl/pkg/storage/storage.go @@ -9,8 +9,8 @@ import ( "github.com/sirupsen/logrus" "github.com/TBD54566975/did-dht/pkg/dht" - "github.com/TBD54566975/did-dht/pkg/storage/db/bolt" "github.com/TBD54566975/did-dht/pkg/storage/db/postgres" + "github.com/TBD54566975/did-dht/pkg/storage/db/sqlite" ) type Storage interface { @@ -32,13 +32,14 @@ func NewStorage(uri string) (Storage, error) { return nil, err } switch u.Scheme { - case "bolt", "": + case "sqlite", "": filename := u.Host if u.Path != "" { filename = fmt.Sprintf("%s/%s", filename, u.Path) } - logrus.WithField("file", filename).Info("using boltdb for storage") - return bolt.NewBolt(filename) + logrus.WithField("file", filename).Info("using sqlite for storage") + return sqlite.NewSQLite(filename) + case "postgres": logrus.WithFields(logrus.Fields{ "host": u.Host, diff --git a/impl/pkg/storage/storage_test.go b/impl/pkg/storage/storage_test.go index 73f1c414..0bf2ccae 100644 --- a/impl/pkg/storage/storage_test.go +++ b/impl/pkg/storage/storage_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/require" "github.com/TBD54566975/did-dht/pkg/storage" - "github.com/TBD54566975/did-dht/pkg/storage/db/bolt" "github.com/TBD54566975/did-dht/pkg/storage/db/postgres" + "github.com/TBD54566975/did-dht/pkg/storage/db/sqlite" ) func TestNewStoragePostgres(t *testing.T) { @@ -30,10 +30,21 @@ func TestNewStoragePostgres(t *testing.T) { assert.IsType(t, postgres.Postgres(""), db) } -func TestNewStorageBolt(t *testing.T) { - db, err := storage.NewStorage("bolt:///tmp/bolt.db") +func TestNewStorageSQLite(t *testing.T) { + uri := os.Getenv("TEST_DB") + if uri == "" { + t.SkipNow() + } + + u, err := url.Parse(uri) + require.NoError(t, err) + if u.Scheme != "sqlite" { + t.SkipNow() + } + + db, err := storage.NewStorage(uri) require.NoError(t, err) - assert.IsType(t, &bolt.Bolt{}, db) + assert.IsType(t, sqlite.SQLite(""), db) } func TestNewStorageUnsupported(t *testing.T) { diff --git a/impl/sqlc.yaml b/impl/sqlc.yaml index b36ecbd9..ee3bac08 100644 --- a/impl/sqlc.yaml +++ b/impl/sqlc.yaml @@ -7,4 +7,11 @@ sql: go: package: "postgres" out: "pkg/storage/db/postgres" - sql_package: "pgx/v5" \ No newline at end of file + sql_package: "pgx/v5" + - engine: "sqlite" + queries: "pkg/storage/db/sqlite/queries" + schema: "pkg/storage/db/sqlite/migrations" + gen: + go: + package: "sqlite" + out: "pkg/storage/db/sqlite" \ No newline at end of file From 0d2e807c76a90dd34db91fb5d2d3fa69cbc4ecaa Mon Sep 17 00:00:00 2001 From: Ada Lundhe Date: Thu, 31 Oct 2024 13:59:25 -0500 Subject: [PATCH 2/4] AL: remove boltdb --- impl/go.mod | 1 - impl/go.sum | 2 - impl/pkg/storage/db/bolt/bolt.go | 290 -------------------------- impl/pkg/storage/db/bolt/bolt_test.go | 224 -------------------- impl/pkg/storage/db/bolt/dht.go | 57 ----- 5 files changed, 574 deletions(-) delete mode 100644 impl/pkg/storage/db/bolt/bolt.go delete mode 100644 impl/pkg/storage/db/bolt/bolt_test.go delete mode 100644 impl/pkg/storage/db/bolt/dht.go diff --git a/impl/go.mod b/impl/go.mod index e65b0a81..636c1a79 100644 --- a/impl/go.mod +++ b/impl/go.mod @@ -30,7 +30,6 @@ require ( github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 github.com/tv42/zbase32 v0.0.0-20220222190657-f76a9fc892fa - go.etcd.io/bbolt v1.3.11 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.56.0 go.opentelemetry.io/contrib/instrumentation/runtime v0.56.0 go.opentelemetry.io/otel v1.31.0 diff --git a/impl/go.sum b/impl/go.sum index fe5b07bf..03a041bc 100644 --- a/impl/go.sum +++ b/impl/go.sum @@ -476,8 +476,6 @@ github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPyS github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0= -go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/impl/pkg/storage/db/bolt/bolt.go b/impl/pkg/storage/db/bolt/bolt.go deleted file mode 100644 index d8e9d159..00000000 --- a/impl/pkg/storage/db/bolt/bolt.go +++ /dev/null @@ -1,290 +0,0 @@ -package bolt - -import ( - "bytes" - "context" - "encoding/binary" - "time" - - "github.com/goccy/go-json" - - "github.com/pkg/errors" - "github.com/sirupsen/logrus" - bolt "go.etcd.io/bbolt" - - "github.com/TBD54566975/did-dht/pkg/dht" - "github.com/TBD54566975/did-dht/pkg/telemetry" -) - -const ( - dhtNamespace = "dht" - failedNamespace = "failed" -) - -type Bolt struct { - db *bolt.DB -} - -type boltRecord struct { - key, value []byte -} - -// NewBolt creates a BoltDB-based implementation of storage.Storage -func NewBolt(path string) (*Bolt, error) { - if path == "" { - return nil, errors.New("path is required") - } - db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 3 * time.Second}) - if err != nil { - return nil, err - } - return &Bolt{db: db}, nil -} - -// WriteRecord writes the given record to the storage -// TODO: don't overwrite existing records, store unique seq numbers -func (b *Bolt) WriteRecord(ctx context.Context, record dht.BEP44Record) error { - ctx, span := telemetry.GetTracer().Start(ctx, "bolt.WriteRecord") - defer span.End() - - encoded := encodeRecord(record) - recordBytes, err := json.Marshal(encoded) - if err != nil { - return err - } - - return b.write(ctx, dhtNamespace, record.ID(), recordBytes) -} - -// ReadRecord reads the record with the given id from the storage -func (b *Bolt) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ReadRecord") - defer span.End() - - recordBytes, err := b.read(ctx, dhtNamespace, id) - if err != nil { - return nil, err - } - if len(recordBytes) == 0 { - return nil, nil - } - - var b64record base64BEP44Record - if err = json.Unmarshal(recordBytes, &b64record); err != nil { - return nil, err - } - - record, err := b64record.Decode() - if err != nil { - return nil, err - } - - return record, nil -} - -// ListRecords lists all records in the storage -func (b *Bolt) ListRecords(ctx context.Context, nextPageToken []byte, pageSize int) ([]dht.BEP44Record, []byte, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "bolt.ListRecords") - defer span.End() - - boltRecords, err := b.readSeveral(ctx, dhtNamespace, nextPageToken, pageSize) - if err != nil { - return nil, nil, err - } - - var records []dht.BEP44Record - for _, recordBytes := range boltRecords { - var encodedRecord base64BEP44Record - if err = json.Unmarshal(recordBytes.value, &encodedRecord); err != nil { - return nil, nil, err - } - - record, err := encodedRecord.Decode() - if err != nil { - return nil, nil, err - } - - records = append(records, *record) - } - - if len(boltRecords) == pageSize { - nextPageToken = boltRecords[len(boltRecords)-1].key - } else { - nextPageToken = nil - } - - return records, nextPageToken, nil -} - -func (b *Bolt) Close() error { - return b.db.Close() -} - -func (b *Bolt) write(ctx context.Context, namespace string, key string, value []byte) error { - _, span := telemetry.GetTracer().Start(ctx, "bolt.write") - defer span.End() - - return b.db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists([]byte(namespace)) - if err != nil { - return err - } - if err = bucket.Put([]byte(key), value); err != nil { - return err - } - return nil - }) -} - -func (b *Bolt) read(ctx context.Context, namespace, key string) ([]byte, error) { - _, span := telemetry.GetTracer().Start(ctx, "bolt.read") - defer span.End() - - var result []byte - err := b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(namespace)) - if bucket == nil { - logrus.WithContext(ctx).WithField("namespace", namespace).Info("namespace does not exist") - return nil - } - result = bucket.Get([]byte(key)) - return nil - }) - return result, err -} - -func (b *Bolt) readAll(namespace string) (map[string][]byte, error) { - result := make(map[string][]byte) - err := b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(namespace)) - if bucket == nil { - logrus.WithField("namespace", namespace).Warn("namespace does not exist") - return nil - } - cursor := bucket.Cursor() - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - result[string(k)] = v - } - return nil - }) - return result, err -} - -func (b *Bolt) readSeveral(ctx context.Context, namespace string, after []byte, count int) ([]boltRecord, error) { - _, span := telemetry.GetTracer().Start(ctx, "bolt.readSeveral") - defer span.End() - - var result []boltRecord - err := b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(namespace)) - if bucket == nil { - logrus.WithContext(ctx).WithField("namespace", namespace).Warn("namespace does not exist") - return nil - } - - cursor := bucket.Cursor() - - var k []byte - var v []byte - if after != nil { - cursor.Seek(after) - k, v = cursor.Next() - } else { - k, v = cursor.First() - } - - for ; k != nil; k, v = cursor.Next() { - result = append(result, boltRecord{key: k, value: v}) - if len(result) >= count { - break - } - } - return nil - }) - return result, err -} - -// RecordCount returns the number of records in the storage for the mainline namespace -func (b *Bolt) RecordCount(ctx context.Context) (int, error) { - _, span := telemetry.GetTracer().Start(ctx, "bolt.RecordCount") - defer span.End() - - var count int - err := b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(dhtNamespace)) - if bucket == nil { - logrus.WithContext(ctx).WithField("namespace", dhtNamespace).Warn("namespace does not exist") - return nil - } - count = bucket.Stats().KeyN - return nil - }) - return count, err -} - -func (b *Bolt) WriteFailedRecord(ctx context.Context, id string) error { - _, span := telemetry.GetTracer().Start(ctx, "bolt.WriteFailedRecord") - defer span.End() - - return b.db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists([]byte(failedNamespace)) - if err != nil { - return err - } - - var count int32 = 1 - v := bucket.Get([]byte(id)) - if v != nil { - count = int32(binary.LittleEndian.Uint32(v)) - count++ - } - - buf := new(bytes.Buffer) - if err = binary.Write(buf, binary.LittleEndian, count); err != nil { - return err - } - return bucket.Put([]byte(id), buf.Bytes()) - }) -} - -func (b *Bolt) ListFailedRecords(ctx context.Context) ([]dht.FailedRecord, error) { - _, span := telemetry.GetTracer().Start(ctx, "bolt.ListFailedRecords") - defer span.End() - - var result []dht.FailedRecord - err := b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(failedNamespace)) - if bucket == nil { - logrus.WithField("namespace", failedNamespace).Warn("namespace does not exist") - return nil - } - - cursor := bucket.Cursor() - for k, v := cursor.First(); k != nil; k, v = cursor.Next() { - var count int - if err := binary.Read(bytes.NewReader(v), binary.LittleEndian, &count); err != nil { - return err - } - result = append(result, dht.FailedRecord{ID: string(k), Count: count}) - } - return nil - }) - return result, err -} - -func (b *Bolt) FailedRecordCount(ctx context.Context) (int, error) { - _, span := telemetry.GetTracer().Start(ctx, "bolt.FailedRecordCount") - defer span.End() - - var count int - err := b.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket([]byte(failedNamespace)) - if bucket == nil { - logrus.WithField("namespace", failedNamespace).Warn("namespace does not exist") - return nil - } - count = bucket.Stats().KeyN - return nil - }) - return count, err -} diff --git a/impl/pkg/storage/db/bolt/bolt_test.go b/impl/pkg/storage/db/bolt/bolt_test.go deleted file mode 100644 index 5114fece..00000000 --- a/impl/pkg/storage/db/bolt/bolt_test.go +++ /dev/null @@ -1,224 +0,0 @@ -package bolt - -import ( - "context" - "os" - "testing" - - "github.com/goccy/go-json" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/TBD54566975/did-dht/internal/did" - "github.com/TBD54566975/did-dht/pkg/dht" -) - -func TestBoltDB_ReadWrite(t *testing.T) { - ctx := context.Background() - - db := getTestDB(t) - - // create a name space and a message in it - namespace := "F1" - - team1 := "Red Bull" - players1 := []string{"Max Verstappen", "Sergio PĂ©rez"} - p1Bytes, err := json.Marshal(players1) - assert.NoError(t, err) - - err = db.write(ctx, namespace, team1, p1Bytes) - assert.NoError(t, err) - - // get it back - gotPlayers1, err := db.read(ctx, namespace, team1) - assert.NoError(t, err) - - var players1Result []string - err = json.Unmarshal(gotPlayers1, &players1Result) - assert.NoError(t, err) - assert.EqualValues(t, players1, players1Result) - - // get a value from a oldDHTNamespace that doesn't exist - res, err := db.read(ctx, "bad", "worse") - assert.NoError(t, err) - assert.Empty(t, res) - - // get a value that doesn't exist in the oldDHTNamespace - noValue, err := db.read(ctx, namespace, "Porsche") - assert.NoError(t, err) - assert.Empty(t, noValue) - - // create a second value in the oldDHTNamespace - team2 := "McLaren" - players2 := []string{"Lando Norris", "Daniel Ricciardo"} - p2Bytes, err := json.Marshal(players2) - assert.NoError(t, err) - - err = db.write(ctx, namespace, team2, p2Bytes) - assert.NoError(t, err) - - // get all values from the oldDHTNamespace - gotAll, err := db.readAll(namespace) - assert.NoError(t, err) - assert.True(t, len(gotAll) == 2) - - _, gotRedBull := gotAll[team1] - assert.True(t, gotRedBull) - - _, gotMcLaren := gotAll[team2] - assert.True(t, gotMcLaren) -} - -func TestBoltDB_PrefixAndKeys(t *testing.T) { - ctx := context.Background() - - db := getTestDB(t) - - namespace := "blockchains" - - // set up prefix read test - - dummyData := []byte("dummy") - err := db.write(ctx, namespace, "bitcoin-testnet", dummyData) - assert.NoError(t, err) - - err = db.write(ctx, namespace, "bitcoin-mainnet", dummyData) - assert.NoError(t, err) - - err = db.write(ctx, namespace, "tezos-testnet", dummyData) - assert.NoError(t, err) - - err = db.write(ctx, namespace, "tezos-mainnet", dummyData) - assert.NoError(t, err) -} - -func getTestDB(t *testing.T) *Bolt { - path := "test.db" - db, err := NewBolt(path) - assert.NoError(t, err) - assert.NotEmpty(t, db) - - t.Cleanup(func() { - _ = db.Close() - _ = os.Remove(path) - }) - return db -} - -func TestReadWrite(t *testing.T) { - db := getTestDB(t) - ctx := context.Background() - - beforeCnt, err := db.RecordCount(ctx) - require.NoError(t, err) - - // create a did doc as a packet to store - sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{}) - require.NoError(t, err) - require.NotEmpty(t, doc) - - packet, err := did.DHT(doc.ID).ToDNSPacket(*doc, nil, nil, nil) - require.NoError(t, err) - require.NotEmpty(t, packet) - - putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) - require.NoError(t, err) - require.NotEmpty(t, putMsg) - - r := dht.RecordFromBEP44(putMsg) - - err = db.WriteRecord(ctx, r) - require.NoError(t, err) - - r2, err := db.ReadRecord(ctx, r.ID()) - require.NoError(t, err) - - assert.Equal(t, r.Key, r2.Key) - assert.Equal(t, r.Value, r2.Value) - assert.Equal(t, r.Signature, r2.Signature) - assert.Equal(t, r.SequenceNumber, r2.SequenceNumber) - - afterCnt, err := db.RecordCount(ctx) - assert.NoError(t, err) - assert.Equal(t, beforeCnt+1, afterCnt) -} - -func TestDBPagination(t *testing.T) { - db := getTestDB(t) - defer db.Close() - - ctx := context.Background() - - beforeCnt, err := db.RecordCount(ctx) - require.NoError(t, err) - - preTestRecords, _, err := db.ListRecords(ctx, nil, 10) - require.NoError(t, err) - - // store 10 records - for i := 0; i < 10; i++ { - // create a did doc as a packet to store - sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{}) - require.NoError(t, err) - require.NotEmpty(t, doc) - - packet, err := did.DHT(doc.ID).ToDNSPacket(*doc, nil, nil, nil) - assert.NoError(t, err) - assert.NotEmpty(t, packet) - - putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) - require.NoError(t, err) - require.NotEmpty(t, putMsg) - - // create record - record := dht.RecordFromBEP44(putMsg) - - err = db.WriteRecord(ctx, record) - assert.NoError(t, err) - } - - // store 11th document - // create a did doc as a packet to store - sk, doc, err := did.GenerateDIDDHT(did.CreateDIDDHTOpts{}) - require.NoError(t, err) - require.NotEmpty(t, doc) - - packet, err := did.DHT(doc.ID).ToDNSPacket(*doc, nil, nil, nil) - assert.NoError(t, err) - assert.NotEmpty(t, packet) - - putMsg, err := dht.CreateDNSPublishRequest(sk, *packet) - require.NoError(t, err) - require.NotEmpty(t, putMsg) - - // create eleventhRecord - eleventhRecord := dht.RecordFromBEP44(putMsg) - - err = db.WriteRecord(ctx, eleventhRecord) - assert.NoError(t, err) - - // read the first 10 back - page, nextPageToken, err := db.ListRecords(ctx, nil, 10) - assert.NoError(t, err) - assert.Len(t, page, 10) - - page, nextPageToken, err = db.ListRecords(ctx, nextPageToken, 10+len(preTestRecords)) - assert.NoError(t, err) - assert.Nil(t, nextPageToken) - assert.Len(t, page, 1+len(preTestRecords)) - - afterCnt, err := db.RecordCount(ctx) - assert.NoError(t, err) - assert.Equal(t, beforeCnt+11, afterCnt) -} - -func TestNewBolt(t *testing.T) { - b, err := NewBolt("") - assert.Error(t, err) - assert.Nil(t, b) - - b, err = NewBolt("bolt:///fake/path/bolt.db") - assert.Error(t, err) - assert.Nil(t, b) -} diff --git a/impl/pkg/storage/db/bolt/dht.go b/impl/pkg/storage/db/bolt/dht.go deleted file mode 100644 index 474d31e1..00000000 --- a/impl/pkg/storage/db/bolt/dht.go +++ /dev/null @@ -1,57 +0,0 @@ -package bolt - -import ( - "encoding/base64" - "fmt" - - "github.com/TBD54566975/ssi-sdk/util" - - "github.com/TBD54566975/did-dht/pkg/dht" -) - -var ( - encoding = base64.RawURLEncoding -) - -type base64BEP44Record struct { - // Up to an 1000 byte base64URL encoded string - V string `json:"v" validate:"required"` - // 32 byte base64URL encoded string - K string `json:"k" validate:"required"` - // 64 byte base64URL encoded string - Sig string `json:"sig" validate:"required"` - Seq int64 `json:"seq" validate:"required"` -} - -func encodeRecord(r dht.BEP44Record) base64BEP44Record { - return base64BEP44Record{ - V: encoding.EncodeToString(r.Value[:]), - K: encoding.EncodeToString(r.Key[:]), - Sig: encoding.EncodeToString(r.Signature[:]), - Seq: r.SequenceNumber, - } -} - -func (b base64BEP44Record) Decode() (*dht.BEP44Record, error) { - v, err := encoding.DecodeString(b.V) - if err != nil { - return nil, fmt.Errorf("error parsing bep44 value field: %v", err) - } - - k, err := encoding.DecodeString(b.K) - if err != nil { - return nil, fmt.Errorf("error parsing bep44 key field: %v", err) - } - - sig, err := encoding.DecodeString(b.Sig) - if err != nil { - return nil, fmt.Errorf("error parsing bep44 sig field: %v", err) - } - - record, err := dht.NewBEP44Record(k, v, sig, b.Seq) - if err != nil { - // TODO: do something useful if this happens - return nil, util.LoggingErrorMsg(err, "error loading record from database, skipping") - } - return record, nil -} From 9464f652e12eff3c7aa244691c9d896e6a945689 Mon Sep 17 00:00:00 2001 From: Ada Lundhe Date: Thu, 31 Oct 2024 14:05:34 -0500 Subject: [PATCH 3/4] AL: update trace names to reflect sqlite not postgresql --- impl/pkg/storage/db/sqlite/sqlite.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/impl/pkg/storage/db/sqlite/sqlite.go b/impl/pkg/storage/db/sqlite/sqlite.go index 796a3641..2e2a2b82 100644 --- a/impl/pkg/storage/db/sqlite/sqlite.go +++ b/impl/pkg/storage/db/sqlite/sqlite.go @@ -63,7 +63,7 @@ func (s SQLite) connect(ctx context.Context) (*Queries, *sql.DB, error) { } func (s SQLite) WriteRecord(ctx context.Context, record dht.BEP44Record) error { - ctx, span := telemetry.GetTracer().Start(ctx, "postgres.WriteRecord") + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.WriteRecord") defer span.End() queries, db, err := s.connect(ctx) @@ -86,7 +86,7 @@ func (s SQLite) WriteRecord(ctx context.Context, record dht.BEP44Record) error { } func (s SQLite) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ReadRecord") + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.ReadRecord") defer span.End() queries, db, err := s.connect(ctx) @@ -113,7 +113,7 @@ func (s SQLite) ReadRecord(ctx context.Context, id string) (*dht.BEP44Record, er } func (s SQLite) ListRecords(ctx context.Context, nextPageToken []byte, limit int) ([]dht.BEP44Record, []byte, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ListRecords") + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.ListRecords") defer span.End() queries, db, err := s.connect(ctx) @@ -161,7 +161,7 @@ func (row DhtRecord) Record() (*dht.BEP44Record, error) { } func (s SQLite) RecordCount(ctx context.Context) (int, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "postgres.RecordCount") + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.RecordCount") defer span.End() queries, db, err := s.connect(ctx) @@ -179,7 +179,7 @@ func (s SQLite) RecordCount(ctx context.Context) (int, error) { } func (s SQLite) WriteFailedRecord(ctx context.Context, id string) error { - ctx, span := telemetry.GetTracer().Start(ctx, "postgres.WriteFailedRecord") + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.WriteFailedRecord") defer span.End() queries, db, err := s.connect(ctx) @@ -200,7 +200,7 @@ func (s SQLite) WriteFailedRecord(ctx context.Context, id string) error { } func (s SQLite) ListFailedRecords(ctx context.Context) ([]dht.FailedRecord, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "postgres.ListFailedRecords") + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.ListFailedRecords") defer span.End() queries, db, err := s.connect(ctx) @@ -226,7 +226,7 @@ func (s SQLite) ListFailedRecords(ctx context.Context) ([]dht.FailedRecord, erro } func (s SQLite) FailedRecordCount(ctx context.Context) (int, error) { - ctx, span := telemetry.GetTracer().Start(ctx, "postgres.FailedRecordCount") + ctx, span := telemetry.GetTracer().Start(ctx, "sqlite.FailedRecordCount") defer span.End() queries, db, err := s.connect(ctx) From 806bd34cb2f0257d03d76bad962f0b25d2138223 Mon Sep 17 00:00:00 2001 From: Ada Lundhe Date: Thu, 31 Oct 2024 14:07:15 -0500 Subject: [PATCH 4/4] AL: update docs to denote SQLite support --- impl/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/impl/README.md b/impl/README.md index b9018507..f8224af8 100644 --- a/impl/README.md +++ b/impl/README.md @@ -59,6 +59,11 @@ docker run \ did-dht ``` +### SQLite + +To use a SQLite database as the storage backend, set configuration option `storage_uri` to a `sqlite://` URI with +the database connection string. The schema will be created or updated as needed while the program starts. + ### Postgres To use a postgres database as the storage backend, set configuration option `storage_uri` to a `postgres://` URI with