Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Enrich Zipkin db tracing #40

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/cataloguesvc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ func main() {
}

// Data domain.
db, err := sqlx.Open("mysql", *dsn)
var db catalogue.Database
sqlxdb, err := sqlx.Open("mysql", *dsn)
//db = &catalogue.SqlxDb{Db: sqlxdb, Logger: logger}
db = catalogue.NewDatabase(sqlxdb, logger)
db = catalogue.DbTracingMiddleware()(db)
if err != nil {
logger.Log("err", err)
os.Exit(1)
Expand Down
196 changes: 196 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package catalogue

import(
"context"
"database/sql"
"errors"
"strings"

"github.com/go-kit/kit/log"
"github.com/jmoiron/sqlx"
)

type Database interface {
GetSock(ctx context.Context, id string) (Sock, error)
GetSocks(ctx context.Context, tags []string, order string) ([]Sock, error)
GetTags(context.Context) ([]string, error)
CountSocks(ctx context.Context, tags []string) (int, error)
Ping() error
Close() error
}

type SqlxDb interface {
Ping() error
Close() error
Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Prepare(ctx context.Context, query string) (StmtMiddleware, error)
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}

// SqlxDb meets the Database interface requirements
type db struct {
// db is a reference for the underlying database implementation
//db *sqlx.DB
db SqlxDb
logger log.Logger
}

type sqlxdb struct {
db *sqlx.DB
}

func NewDatabase(sdb *sqlx.DB, logger log.Logger) Database {
return &db {
db: SqlxDbTracingMiddleware()(&sqlxdb{db: sdb}),
logger: logger,
}
}

// ErrNotFound is returned when there is no sock for a given ID.
var ErrNotFound = errors.New("not found")

// ErrDBConnection is returned when connection with the database fails.
var ErrDBConnection = errors.New("database connection error")

var baseQuery = "SELECT sock.sock_id AS id, sock.name, sock.description, sock.price, sock.count, sock.image_url_1, sock.image_url_2, GROUP_CONCAT(tag.name) AS tag_name FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id"

func (db *db) GetSock(ctx context.Context, id string) (Sock, error) {
query := baseQuery + " WHERE sock.sock_id =? GROUP BY sock.sock_id;"

var sock Sock
err := db.db.Get(ctx, &sock, query, id)
if err != nil {
db.logger.Log("database error", err)
return Sock{}, ErrNotFound
}

sock.ImageURL = []string{sock.ImageURL_1, sock.ImageURL_2}
sock.Tags = strings.Split(sock.TagString, ",")
return sock, nil
}

func (db *db) GetSocks(ctx context.Context, tags []string, order string) ([]Sock, error) {
var socks []Sock
query := baseQuery

var args []interface{}

for i, t := range tags {
if i == 0 {
query += " WHERE tag.name=?"
args = append(args, t)
} else {
query += " OR tag.name=?"
args = append(args, t)
}
}

query += " GROUP BY id"

if order != "" {
query += " ORDER BY ?"
args = append(args, order)
}

query += ";"

err := db.db.Select(ctx, &socks, query, args...)
if err != nil {
db.logger.Log("database error", err)
return []Sock{}, ErrDBConnection
}
for i, s := range socks {
socks[i].ImageURL = []string{s.ImageURL_1, s.ImageURL_2}
socks[i].Tags = strings.Split(s.TagString, ",")
}

return socks, nil
}

func (db *db) CountSocks(ctx context.Context, tags []string) (int, error) {
query := "SELECT COUNT(DISTINCT sock.sock_id) FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id"

var args []interface{}

for i, t := range tags {
if i == 0 {
query += " WHERE tag.name=?"
args = append(args, t)
} else {
query += " OR tag.name=?"
args = append(args, t)
}
}

query += ";"

sel, err := db.db.Prepare(ctx, query)

if err != nil {
db.logger.Log("database error", err)
return 0, ErrDBConnection
}
defer sel.Close()

var count int
err = sel.QueryRow(ctx, args...).Scan(&count)

if err != nil {
db.logger.Log("database error", err)
return 0, ErrDBConnection
}
return count, nil
}

func (db *db) GetTags(ctx context.Context) ([]string, error) {
var tags []string
query := "SELECT name FROM tag;"
rows, err := db.db.Query(ctx, query)
if err != nil {
return []string{}, ErrDBConnection
}
var tag string
for rows.Next() {
err = rows.Scan(&tag)
if err != nil {
db.logger.Log("database error", err)
continue
}
tags = append(tags, tag)
}
return tags, nil
}

func (db *db) Ping() error {
return db.db.Ping()
}

func (db *db) Close() error {
return db.db.Close()
}

func (sqlxdb *sqlxdb) Ping() error {
return sqlxdb.db.Ping()
}

func (sqlxdb *sqlxdb) Close() error {
return sqlxdb.db.Close()
}

func (sqlxdb *sqlxdb) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return sqlxdb.db.Select(dest, query, args...)
}

func (sqlxdb *sqlxdb) Prepare(ctx context.Context, query string) (StmtMiddleware, error) {
sel, err := sqlxdb.db.Prepare(query)
return StmtMiddleware{next: sel}, err
}

func (sqlxdb *sqlxdb) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
return sqlxdb.db.Get(dest, query, args...)
}

func (sqlxdb *sqlxdb) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return sqlxdb.db.Query(query, args...)
}
138 changes: 138 additions & 0 deletions db_tracing_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package catalogue

import (
"fmt"
"context"
"database/sql"

otext "github.com/opentracing/opentracing-go/ext"
stdopentracing "github.com/opentracing/opentracing-go"
)

// DbMiddleware decorates a Database.
type DbMiddleware func(Database) Database

// SqlxDbMiddleware decorates a SqlxDb.
type SqlxDbMiddleware func(SqlxDb) SqlxDb

// DbTracingMiddleware returns middleware for tracing app level db access.
func DbTracingMiddleware() DbMiddleware {
return func(next Database) Database {
return dbTracingMiddleware{
next: next,
}
}
}

// SqlxDbTracingMiddleware returns middleware for tracing low level db access.
func SqlxDbTracingMiddleware() SqlxDbMiddleware {
return func(next SqlxDb) SqlxDb {
return sqlxDbTracingMiddleware{
next: next,
}
}
}

// dbTracingMiddleware meets the Database interface.
type dbTracingMiddleware struct {
next Database
}

// sqlxDbTracingMiddleware meets the SqlxDb interface.
type sqlxDbTracingMiddleware struct {
next SqlxDb
}

type StmtMiddleware struct {
next *sql.Stmt
}

func (stmt StmtMiddleware) Close() error {
return stmt.next.Close()
}

func (stmt StmtMiddleware) QueryRow(ctx context.Context, args ...interface{}) *sql.Row {
return stmt.next.QueryRow(args...)
}

func (mw dbTracingMiddleware) GetSock(ctx context.Context, id string) (Sock, error) {
span, ctx := startSpan(ctx, "sock from database")
sock, err := mw.next.GetSock(ctx, id)
finishSpan(span, len(fmt.Sprintf("%#v", sock)))
return sock, err
}

func (mw dbTracingMiddleware) GetSocks(ctx context.Context, tags []string, order string) ([]Sock, error) {
span, ctx := startSpan(ctx, "socks from database")
socks, err := mw.next.GetSocks(ctx, tags, order)
finishSpan(span, len(fmt.Sprintf("%#v", tags)))
return socks, err
}

func (mw dbTracingMiddleware) CountSocks(ctx context.Context, tags []string) (int, error) {
span, ctx := startSpan(ctx, "count socks from database")
count, err := mw.next.CountSocks(ctx, tags)
finishSpan(span, 170)
return count, err
}

func (mw dbTracingMiddleware) GetTags(ctx context.Context) ([]string, error) {
span, ctx := startSpan(ctx, "tags from database")
tags, err := mw.next.GetTags(ctx)
finishSpan(span, len(fmt.Sprintf("%#v", tags)))
return tags, err
}

func (mw dbTracingMiddleware) Ping() error {
return mw.next.Ping()
}

func (mw dbTracingMiddleware) Close() error {
return mw.next.Close()
}

func (mw sqlxDbTracingMiddleware) Ping() error {
return mw.next.Ping()
}

func (mw sqlxDbTracingMiddleware) Close() error {
return mw.next.Close()
}

func (mw sqlxDbTracingMiddleware) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
span := stdopentracing.SpanFromContext(ctx)
span.SetTag("db.query.size", len(query))
return mw.next.Select(ctx, dest, query, args...)
}

func (mw sqlxDbTracingMiddleware) Prepare(ctx context.Context, query string) (StmtMiddleware, error) {
span := stdopentracing.SpanFromContext(ctx)
span.SetTag("db.query.size", len(query))
return mw.next.Prepare(ctx, query)
}

func (mw sqlxDbTracingMiddleware) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
span := stdopentracing.SpanFromContext(ctx)
span.SetTag("db.query.size", len(query) + len(fmt.Sprintf("%#v", args)))
return mw.next.Get(ctx, dest, query, args...)
}

func (mw sqlxDbTracingMiddleware) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
span := stdopentracing.SpanFromContext(ctx)
span.SetTag("db.query.size", len(query) + len(fmt.Sprintf("%#v", args)))
return mw.next.Query(ctx, query, args...)
}

func startSpan(ctx context.Context, n string) (stdopentracing.Span, context.Context) {
var span stdopentracing.Span
span, ctx = stdopentracing.StartSpanFromContext(ctx, n)
otext.SpanKindRPCClient.Set(span)
span.SetTag("db.type", "mysql")
span.SetTag("peer.address", "catalogue-db:3306")
return span, ctx
}

func finishSpan(span stdopentracing.Span, size int) {
span.SetTag("db.query.result.size", size)
span.Finish()
}
8 changes: 4 additions & 4 deletions endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func MakeEndpoints(s Service, tracer stdopentracing.Tracer) Endpoints {
func MakeListEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(listRequest)
socks, err := s.List(req.Tags, req.Order, req.PageNum, req.PageSize)
socks, err := s.List(ctx, req.Tags, req.Order, req.PageNum, req.PageSize)
return listResponse{Socks: socks, Err: err}, err
}
}
Expand All @@ -45,7 +45,7 @@ func MakeListEndpoint(s Service) endpoint.Endpoint {
func MakeCountEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(countRequest)
n, err := s.Count(req.Tags)
n, err := s.Count(ctx, req.Tags)
return countResponse{N: n, Err: err}, err
}
}
Expand All @@ -54,15 +54,15 @@ func MakeCountEndpoint(s Service) endpoint.Endpoint {
func MakeGetEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
req := request.(getRequest)
sock, err := s.Get(req.ID)
sock, err := s.Get(ctx, req.ID)
return getResponse{Sock: sock, Err: err}, err
}
}

// MakeTagsEndpoint returns an endpoint via the given service.
func MakeTagsEndpoint(s Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
tags, err := s.Tags()
tags, err := s.Tags(ctx)
return tagsResponse{Tags: tags, Err: err}, err
}
}
Expand Down
Loading