From fd9e6c2f79e4629202e50b09c8e1c4b72ecb4612 Mon Sep 17 00:00:00 2001 From: embs Date: Sat, 25 Nov 2017 18:35:29 -0300 Subject: [PATCH 1/2] Enrich Zipkin db tracing Add info regarding - database type and address - approximation for queries results size --- cmd/cataloguesvc/main.go | 5 ++- db.go | 48 ++++++++++++++++++++++ db_tracing_middleware.go | 88 ++++++++++++++++++++++++++++++++++++++++ endpoints.go | 8 ++-- logging.go | 17 ++++---- service.go | 30 +++++++------- 6 files changed, 168 insertions(+), 28 deletions(-) create mode 100644 db.go create mode 100644 db_tracing_middleware.go diff --git a/cmd/cataloguesvc/main.go b/cmd/cataloguesvc/main.go index 0c480c7f..69b87a7c 100644 --- a/cmd/cataloguesvc/main.go +++ b/cmd/cataloguesvc/main.go @@ -106,7 +106,10 @@ func main() { } // Data domain. - db, err := sqlx.Open("mysql", *dsn) + var db catalogue.Database + sqlxdb, err := sqlx.Open("mysql", *dsn) + db = &catalogue.SqlxDb{Db: sqlxdb} + db = catalogue.DbTracingMiddleware()(db) if err != nil { logger.Log("err", err) os.Exit(1) diff --git a/db.go b/db.go new file mode 100644 index 00000000..c5c0e7bc --- /dev/null +++ b/db.go @@ -0,0 +1,48 @@ +package catalogue + +import( + "context" + "database/sql" + + "github.com/jmoiron/sqlx" +) + +type Database interface { + Close() error + Ping() error + Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error + Prepare(query string) (StmtMiddleware, error) + Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error + Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) +} + +// SqlxDb meets the Database interface requirements +type SqlxDb struct { + // db is a reference for the underlying database implementation + Db *sqlx.DB +} + +func (sqlxdb *SqlxDb) Close() error { + return sqlxdb.Db.Close() +} + +func (sqlxdb *SqlxDb) Ping() error { + return sqlxdb.Db.Ping() +} + +func (sqlxdb *SqlxDb) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + return sqlxdb.Db.Select(dest, query, args...) +} + +func (sqlxdb *SqlxDb) Prepare(query string) (StmtMiddleware, error) { + sel, err := sqlxdb.Db.Prepare(query) + return StmtMiddleware{next: sel}, err +} + +func (sqlxdb *SqlxDb) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + return sqlxdb.Db.Get(dest, query, args...) +} + +func (sqlxdb *SqlxDb) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + return sqlxdb.Db.Query(query, args...) +} diff --git a/db_tracing_middleware.go b/db_tracing_middleware.go new file mode 100644 index 00000000..4929005a --- /dev/null +++ b/db_tracing_middleware.go @@ -0,0 +1,88 @@ +package catalogue + +import ( + "context" + "database/sql" + "unsafe" + + otext "github.com/opentracing/opentracing-go/ext" + stdopentracing "github.com/opentracing/opentracing-go" +) + +// Middleware decorates a database. +type DbMiddleware func(Database) Database + +// DbTracingMiddleware traces database calls. +func DbTracingMiddleware() DbMiddleware { + return func(next Database) Database { + return dbTracingMiddleware{ + next: next, + } + } +} + +type dbTracingMiddleware struct { + next Database +} + +type StmtMiddleware struct { + next *sql.Stmt +} + +func (stmt StmtMiddleware) Close() error { + return stmt.next.Close() +} + +func (stmt StmtMiddleware) QueryRow(ctx context.Context, args ...interface{}) *sql.Row { + span := startSpan(ctx, "rows from database") + rows := stmt.next.QueryRow(args...) + finishSpan(span, unsafe.Sizeof(rows)) + return rows +} + +func (mw dbTracingMiddleware) Close() error { + return mw.next.Close() +} + +func (mw dbTracingMiddleware) Ping() error { + return mw.next.Ping() +} + +func (mw dbTracingMiddleware) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + span := startSpan(ctx, "socks from database") + err := mw.next.Select(ctx, dest, query, args...) + finishSpan(span, unsafe.Sizeof(dest)) + return err +} + +func (mw dbTracingMiddleware) Prepare(query string) (StmtMiddleware, error) { + return mw.next.Prepare(query) +} + +func (mw dbTracingMiddleware) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + span := startSpan(ctx, "get from database") + err := mw.next.Get(ctx, dest, query, args...) + finishSpan(span, unsafe.Sizeof(dest)) + return err +} + +func (mw dbTracingMiddleware) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + span := startSpan(ctx, "query from database") + rows, err := mw.next.Query(ctx, query, args...) + finishSpan(span, unsafe.Sizeof(rows)) + return rows, err +} + +func startSpan(ctx context.Context, n string) stdopentracing.Span { + var span stdopentracing.Span + span, ctx = stdopentracing.StartSpanFromContext(ctx, n) + otext.SpanKindRPCClient.Set(span) + span.SetTag("db.type", "mysql") + span.SetTag("peer.address", "catalogue-db:3306") + return span +} + +func finishSpan(span stdopentracing.Span, size uintptr) { + span.SetTag("db.query.result.size", size) + span.Finish() +} diff --git a/endpoints.go b/endpoints.go index 1aa2b82b..b6426de5 100644 --- a/endpoints.go +++ b/endpoints.go @@ -36,7 +36,7 @@ func MakeEndpoints(s Service, tracer stdopentracing.Tracer) Endpoints { func MakeListEndpoint(s Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { req := request.(listRequest) - socks, err := s.List(req.Tags, req.Order, req.PageNum, req.PageSize) + socks, err := s.List(ctx, req.Tags, req.Order, req.PageNum, req.PageSize) return listResponse{Socks: socks, Err: err}, err } } @@ -45,7 +45,7 @@ func MakeListEndpoint(s Service) endpoint.Endpoint { func MakeCountEndpoint(s Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { req := request.(countRequest) - n, err := s.Count(req.Tags) + n, err := s.Count(ctx, req.Tags) return countResponse{N: n, Err: err}, err } } @@ -54,7 +54,7 @@ func MakeCountEndpoint(s Service) endpoint.Endpoint { func MakeGetEndpoint(s Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { req := request.(getRequest) - sock, err := s.Get(req.ID) + sock, err := s.Get(ctx, req.ID) return getResponse{Sock: sock, Err: err}, err } } @@ -62,7 +62,7 @@ func MakeGetEndpoint(s Service) endpoint.Endpoint { // MakeTagsEndpoint returns an endpoint via the given service. func MakeTagsEndpoint(s Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (response interface{}, err error) { - tags, err := s.Tags() + tags, err := s.Tags(ctx) return tagsResponse{Tags: tags, Err: err}, err } } diff --git a/logging.go b/logging.go index 2f3aa80b..d5365daf 100644 --- a/logging.go +++ b/logging.go @@ -1,6 +1,7 @@ package catalogue import ( + "context" "strings" "time" @@ -22,7 +23,7 @@ type loggingMiddleware struct { logger log.Logger } -func (mw loggingMiddleware) List(tags []string, order string, pageNum, pageSize int) (socks []Sock, err error) { +func (mw loggingMiddleware) List(ctx context.Context, tags []string, order string, pageNum, pageSize int) (socks []Sock, err error) { defer func(begin time.Time) { mw.logger.Log( "method", "List", @@ -35,10 +36,10 @@ func (mw loggingMiddleware) List(tags []string, order string, pageNum, pageSize "took", time.Since(begin), ) }(time.Now()) - return mw.next.List(tags, order, pageNum, pageSize) + return mw.next.List(ctx, tags, order, pageNum, pageSize) } -func (mw loggingMiddleware) Count(tags []string) (n int, err error) { +func (mw loggingMiddleware) Count(ctx context.Context, tags []string) (n int, err error) { defer func(begin time.Time) { mw.logger.Log( "method", "Count", @@ -48,10 +49,10 @@ func (mw loggingMiddleware) Count(tags []string) (n int, err error) { "took", time.Since(begin), ) }(time.Now()) - return mw.next.Count(tags) + return mw.next.Count(ctx, tags) } -func (mw loggingMiddleware) Get(id string) (s Sock, err error) { +func (mw loggingMiddleware) Get(ctx context.Context, id string) (s Sock, err error) { defer func(begin time.Time) { mw.logger.Log( "method", "Get", @@ -61,10 +62,10 @@ func (mw loggingMiddleware) Get(id string) (s Sock, err error) { "took", time.Since(begin), ) }(time.Now()) - return mw.next.Get(id) + return mw.next.Get(ctx, id) } -func (mw loggingMiddleware) Tags() (tags []string, err error) { +func (mw loggingMiddleware) Tags(ctx context.Context) (tags []string, err error) { defer func(begin time.Time) { mw.logger.Log( "method", "Tags", @@ -73,7 +74,7 @@ func (mw loggingMiddleware) Tags() (tags []string, err error) { "took", time.Since(begin), ) }(time.Now()) - return mw.next.Tags() + return mw.next.Tags(ctx) } func (mw loggingMiddleware) Health() (health []Health) { diff --git a/service.go b/service.go index 65957943..b32c2479 100644 --- a/service.go +++ b/service.go @@ -4,21 +4,21 @@ package catalogue // catalogue service. Everything here is agnostic to the transport (HTTP). import ( + "context" "errors" "strings" "time" "github.com/go-kit/kit/log" - "github.com/jmoiron/sqlx" ) // Service is the catalogue service, providing read operations on a saleable // catalogue of sock products. type Service interface { - List(tags []string, order string, pageNum, pageSize int) ([]Sock, error) // GET /catalogue - Count(tags []string) (int, error) // GET /catalogue/size - Get(id string) (Sock, error) // GET /catalogue/{id} - Tags() ([]string, error) // GET /tags + List(ctx context.Context, tags []string, order string, pageNum, pageSize int) ([]Sock, error) // GET /catalogue + Count(ctx context.Context, tags []string) (int, error) // GET /catalogue/size + Get(ctx context.Context, id string) (Sock, error) // GET /catalogue/{id} + Tags(ctx context.Context) ([]string, error) // GET /tags Health() []Health // GET /health } @@ -56,7 +56,7 @@ var baseQuery = "SELECT sock.sock_id AS id, sock.name, sock.description, sock.pr // NewCatalogueService returns an implementation of the Service interface, // with connection to an SQL database. -func NewCatalogueService(db *sqlx.DB, logger log.Logger) Service { +func NewCatalogueService(db Database, logger log.Logger) Service { return &catalogueService{ db: db, logger: logger, @@ -64,11 +64,11 @@ func NewCatalogueService(db *sqlx.DB, logger log.Logger) Service { } type catalogueService struct { - db *sqlx.DB + db Database logger log.Logger } -func (s *catalogueService) List(tags []string, order string, pageNum, pageSize int) ([]Sock, error) { +func (s *catalogueService) List(ctx context.Context, tags []string, order string, pageNum, pageSize int) ([]Sock, error) { var socks []Sock query := baseQuery @@ -93,7 +93,7 @@ func (s *catalogueService) List(tags []string, order string, pageNum, pageSize i query += ";" - err := s.db.Select(&socks, query, args...) + err := s.db.Select(ctx, &socks, query, args...) if err != nil { s.logger.Log("database error", err) return []Sock{}, ErrDBConnection @@ -111,7 +111,7 @@ func (s *catalogueService) List(tags []string, order string, pageNum, pageSize i return socks, nil } -func (s *catalogueService) Count(tags []string) (int, error) { +func (s *catalogueService) Count(ctx context.Context, tags []string) (int, error) { query := "SELECT COUNT(DISTINCT sock.sock_id) FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id" var args []interface{} @@ -137,7 +137,7 @@ func (s *catalogueService) Count(tags []string) (int, error) { defer sel.Close() var count int - err = sel.QueryRow(args...).Scan(&count) + err = sel.QueryRow(ctx, args...).Scan(&count) if err != nil { s.logger.Log("database error", err) @@ -147,11 +147,11 @@ func (s *catalogueService) Count(tags []string) (int, error) { return count, nil } -func (s *catalogueService) Get(id string) (Sock, error) { +func (s *catalogueService) Get(ctx context.Context, id string) (Sock, error) { query := baseQuery + " WHERE sock.sock_id =? GROUP BY sock.sock_id;" var sock Sock - err := s.db.Get(&sock, query, id) + err := s.db.Get(ctx, &sock, query, id) if err != nil { s.logger.Log("database error", err) return Sock{}, ErrNotFound @@ -181,10 +181,10 @@ func (s *catalogueService) Health() []Health { return health } -func (s *catalogueService) Tags() ([]string, error) { +func (s *catalogueService) Tags(ctx context.Context) ([]string, error) { var tags []string query := "SELECT name FROM tag;" - rows, err := s.db.Query(query) + rows, err := s.db.Query(ctx, query) if err != nil { s.logger.Log("database error", err) return []string{}, ErrDBConnection From 969664772dd9e77ae10e1edcd9b6850ee7ef6a28 Mon Sep 17 00:00:00 2001 From: embs Date: Thu, 30 Nov 2017 14:42:54 -0300 Subject: [PATCH 2/2] Extract db logic from service to db Also - Improve tallying objects sizes for db.query.result.size span tag - Trace db.query.size span tag --- cmd/cataloguesvc/main.go | 3 +- db.go | 178 +++++++++++++++++++++++++++++++++++---- db_tracing_middleware.go | 106 +++++++++++++++++------ service.go | 100 ++-------------------- 4 files changed, 250 insertions(+), 137 deletions(-) diff --git a/cmd/cataloguesvc/main.go b/cmd/cataloguesvc/main.go index 69b87a7c..53bef638 100644 --- a/cmd/cataloguesvc/main.go +++ b/cmd/cataloguesvc/main.go @@ -108,7 +108,8 @@ func main() { // Data domain. var db catalogue.Database sqlxdb, err := sqlx.Open("mysql", *dsn) - db = &catalogue.SqlxDb{Db: sqlxdb} + //db = &catalogue.SqlxDb{Db: sqlxdb, Logger: logger} + db = catalogue.NewDatabase(sqlxdb, logger) db = catalogue.DbTracingMiddleware()(db) if err != nil { logger.Log("err", err) diff --git a/db.go b/db.go index c5c0e7bc..227ea58d 100644 --- a/db.go +++ b/db.go @@ -3,46 +3,194 @@ package catalogue import( "context" "database/sql" + "errors" + "strings" + "github.com/go-kit/kit/log" "github.com/jmoiron/sqlx" ) type Database interface { + GetSock(ctx context.Context, id string) (Sock, error) + GetSocks(ctx context.Context, tags []string, order string) ([]Sock, error) + GetTags(context.Context) ([]string, error) + CountSocks(ctx context.Context, tags []string) (int, error) + Ping() error Close() error +} + +type SqlxDb interface { Ping() error + Close() error Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error - Prepare(query string) (StmtMiddleware, error) + Prepare(ctx context.Context, query string) (StmtMiddleware, error) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) } // SqlxDb meets the Database interface requirements -type SqlxDb struct { +type db struct { // db is a reference for the underlying database implementation - Db *sqlx.DB + //db *sqlx.DB + db SqlxDb + logger log.Logger +} + +type sqlxdb struct { + db *sqlx.DB +} + +func NewDatabase(sdb *sqlx.DB, logger log.Logger) Database { + return &db { + db: SqlxDbTracingMiddleware()(&sqlxdb{db: sdb}), + logger: logger, + } +} + +// ErrNotFound is returned when there is no sock for a given ID. +var ErrNotFound = errors.New("not found") + +// ErrDBConnection is returned when connection with the database fails. +var ErrDBConnection = errors.New("database connection error") + +var baseQuery = "SELECT sock.sock_id AS id, sock.name, sock.description, sock.price, sock.count, sock.image_url_1, sock.image_url_2, GROUP_CONCAT(tag.name) AS tag_name FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id" + +func (db *db) GetSock(ctx context.Context, id string) (Sock, error) { + query := baseQuery + " WHERE sock.sock_id =? GROUP BY sock.sock_id;" + + var sock Sock + err := db.db.Get(ctx, &sock, query, id) + if err != nil { + db.logger.Log("database error", err) + return Sock{}, ErrNotFound + } + + sock.ImageURL = []string{sock.ImageURL_1, sock.ImageURL_2} + sock.Tags = strings.Split(sock.TagString, ",") + return sock, nil +} + +func (db *db) GetSocks(ctx context.Context, tags []string, order string) ([]Sock, error) { + var socks []Sock + query := baseQuery + + var args []interface{} + + for i, t := range tags { + if i == 0 { + query += " WHERE tag.name=?" + args = append(args, t) + } else { + query += " OR tag.name=?" + args = append(args, t) + } + } + + query += " GROUP BY id" + + if order != "" { + query += " ORDER BY ?" + args = append(args, order) + } + + query += ";" + + err := db.db.Select(ctx, &socks, query, args...) + if err != nil { + db.logger.Log("database error", err) + return []Sock{}, ErrDBConnection + } + for i, s := range socks { + socks[i].ImageURL = []string{s.ImageURL_1, s.ImageURL_2} + socks[i].Tags = strings.Split(s.TagString, ",") + } + + return socks, nil +} + +func (db *db) CountSocks(ctx context.Context, tags []string) (int, error) { + query := "SELECT COUNT(DISTINCT sock.sock_id) FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id" + + var args []interface{} + + for i, t := range tags { + if i == 0 { + query += " WHERE tag.name=?" + args = append(args, t) + } else { + query += " OR tag.name=?" + args = append(args, t) + } + } + + query += ";" + + sel, err := db.db.Prepare(ctx, query) + + if err != nil { + db.logger.Log("database error", err) + return 0, ErrDBConnection + } + defer sel.Close() + + var count int + err = sel.QueryRow(ctx, args...).Scan(&count) + + if err != nil { + db.logger.Log("database error", err) + return 0, ErrDBConnection + } + return count, nil +} + +func (db *db) GetTags(ctx context.Context) ([]string, error) { + var tags []string + query := "SELECT name FROM tag;" + rows, err := db.db.Query(ctx, query) + if err != nil { + return []string{}, ErrDBConnection + } + var tag string + for rows.Next() { + err = rows.Scan(&tag) + if err != nil { + db.logger.Log("database error", err) + continue + } + tags = append(tags, tag) + } + return tags, nil +} + +func (db *db) Ping() error { + return db.db.Ping() +} + +func (db *db) Close() error { + return db.db.Close() } -func (sqlxdb *SqlxDb) Close() error { - return sqlxdb.Db.Close() +func (sqlxdb *sqlxdb) Ping() error { + return sqlxdb.db.Ping() } -func (sqlxdb *SqlxDb) Ping() error { - return sqlxdb.Db.Ping() +func (sqlxdb *sqlxdb) Close() error { + return sqlxdb.db.Close() } -func (sqlxdb *SqlxDb) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { - return sqlxdb.Db.Select(dest, query, args...) +func (sqlxdb *sqlxdb) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + return sqlxdb.db.Select(dest, query, args...) } -func (sqlxdb *SqlxDb) Prepare(query string) (StmtMiddleware, error) { - sel, err := sqlxdb.Db.Prepare(query) +func (sqlxdb *sqlxdb) Prepare(ctx context.Context, query string) (StmtMiddleware, error) { + sel, err := sqlxdb.db.Prepare(query) return StmtMiddleware{next: sel}, err } -func (sqlxdb *SqlxDb) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error { - return sqlxdb.Db.Get(dest, query, args...) +func (sqlxdb *sqlxdb) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + return sqlxdb.db.Get(dest, query, args...) } -func (sqlxdb *SqlxDb) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { - return sqlxdb.Db.Query(query, args...) +func (sqlxdb *sqlxdb) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + return sqlxdb.db.Query(query, args...) } diff --git a/db_tracing_middleware.go b/db_tracing_middleware.go index 4929005a..292b92fc 100644 --- a/db_tracing_middleware.go +++ b/db_tracing_middleware.go @@ -1,18 +1,21 @@ package catalogue import ( + "fmt" "context" "database/sql" - "unsafe" otext "github.com/opentracing/opentracing-go/ext" stdopentracing "github.com/opentracing/opentracing-go" ) -// Middleware decorates a database. +// DbMiddleware decorates a Database. type DbMiddleware func(Database) Database -// DbTracingMiddleware traces database calls. +// SqlxDbMiddleware decorates a SqlxDb. +type SqlxDbMiddleware func(SqlxDb) SqlxDb + +// DbTracingMiddleware returns middleware for tracing app level db access. func DbTracingMiddleware() DbMiddleware { return func(next Database) Database { return dbTracingMiddleware{ @@ -21,10 +24,25 @@ func DbTracingMiddleware() DbMiddleware { } } +// SqlxDbTracingMiddleware returns middleware for tracing low level db access. +func SqlxDbTracingMiddleware() SqlxDbMiddleware { + return func(next SqlxDb) SqlxDb { + return sqlxDbTracingMiddleware{ + next: next, + } + } +} + +// dbTracingMiddleware meets the Database interface. type dbTracingMiddleware struct { next Database } +// sqlxDbTracingMiddleware meets the SqlxDb interface. +type sqlxDbTracingMiddleware struct { + next SqlxDb +} + type StmtMiddleware struct { next *sql.Stmt } @@ -34,55 +52,87 @@ func (stmt StmtMiddleware) Close() error { } func (stmt StmtMiddleware) QueryRow(ctx context.Context, args ...interface{}) *sql.Row { - span := startSpan(ctx, "rows from database") - rows := stmt.next.QueryRow(args...) - finishSpan(span, unsafe.Sizeof(rows)) - return rows + return stmt.next.QueryRow(args...) +} + +func (mw dbTracingMiddleware) GetSock(ctx context.Context, id string) (Sock, error) { + span, ctx := startSpan(ctx, "sock from database") + sock, err := mw.next.GetSock(ctx, id) + finishSpan(span, len(fmt.Sprintf("%#v", sock))) + return sock, err +} + +func (mw dbTracingMiddleware) GetSocks(ctx context.Context, tags []string, order string) ([]Sock, error) { + span, ctx := startSpan(ctx, "socks from database") + socks, err := mw.next.GetSocks(ctx, tags, order) + finishSpan(span, len(fmt.Sprintf("%#v", tags))) + return socks, err +} + +func (mw dbTracingMiddleware) CountSocks(ctx context.Context, tags []string) (int, error) { + span, ctx := startSpan(ctx, "count socks from database") + count, err := mw.next.CountSocks(ctx, tags) + finishSpan(span, 170) + return count, err +} + +func (mw dbTracingMiddleware) GetTags(ctx context.Context) ([]string, error) { + span, ctx := startSpan(ctx, "tags from database") + tags, err := mw.next.GetTags(ctx) + finishSpan(span, len(fmt.Sprintf("%#v", tags))) + return tags, err +} + +func (mw dbTracingMiddleware) Ping() error { + return mw.next.Ping() } func (mw dbTracingMiddleware) Close() error { return mw.next.Close() } -func (mw dbTracingMiddleware) Ping() error { +func (mw sqlxDbTracingMiddleware) Ping() error { return mw.next.Ping() } -func (mw dbTracingMiddleware) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { - span := startSpan(ctx, "socks from database") - err := mw.next.Select(ctx, dest, query, args...) - finishSpan(span, unsafe.Sizeof(dest)) - return err +func (mw sqlxDbTracingMiddleware) Close() error { + return mw.next.Close() +} + +func (mw sqlxDbTracingMiddleware) Select(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + span := stdopentracing.SpanFromContext(ctx) + span.SetTag("db.query.size", len(query)) + return mw.next.Select(ctx, dest, query, args...) } -func (mw dbTracingMiddleware) Prepare(query string) (StmtMiddleware, error) { - return mw.next.Prepare(query) +func (mw sqlxDbTracingMiddleware) Prepare(ctx context.Context, query string) (StmtMiddleware, error) { + span := stdopentracing.SpanFromContext(ctx) + span.SetTag("db.query.size", len(query)) + return mw.next.Prepare(ctx, query) } -func (mw dbTracingMiddleware) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error { - span := startSpan(ctx, "get from database") - err := mw.next.Get(ctx, dest, query, args...) - finishSpan(span, unsafe.Sizeof(dest)) - return err +func (mw sqlxDbTracingMiddleware) Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error { + span := stdopentracing.SpanFromContext(ctx) + span.SetTag("db.query.size", len(query) + len(fmt.Sprintf("%#v", args))) + return mw.next.Get(ctx, dest, query, args...) } -func (mw dbTracingMiddleware) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { - span := startSpan(ctx, "query from database") - rows, err := mw.next.Query(ctx, query, args...) - finishSpan(span, unsafe.Sizeof(rows)) - return rows, err +func (mw sqlxDbTracingMiddleware) Query(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) { + span := stdopentracing.SpanFromContext(ctx) + span.SetTag("db.query.size", len(query) + len(fmt.Sprintf("%#v", args))) + return mw.next.Query(ctx, query, args...) } -func startSpan(ctx context.Context, n string) stdopentracing.Span { +func startSpan(ctx context.Context, n string) (stdopentracing.Span, context.Context) { var span stdopentracing.Span span, ctx = stdopentracing.StartSpanFromContext(ctx, n) otext.SpanKindRPCClient.Set(span) span.SetTag("db.type", "mysql") span.SetTag("peer.address", "catalogue-db:3306") - return span + return span, ctx } -func finishSpan(span stdopentracing.Span, size uintptr) { +func finishSpan(span stdopentracing.Span, size int) { span.SetTag("db.query.result.size", size) span.Finish() } diff --git a/service.go b/service.go index b32c2479..7b08be9f 100644 --- a/service.go +++ b/service.go @@ -5,8 +5,6 @@ package catalogue import ( "context" - "errors" - "strings" "time" "github.com/go-kit/kit/log" @@ -46,14 +44,6 @@ type Health struct { Time string `json:"time"` } -// ErrNotFound is returned when there is no sock for a given ID. -var ErrNotFound = errors.New("not found") - -// ErrDBConnection is returned when connection with the database fails. -var ErrDBConnection = errors.New("database connection error") - -var baseQuery = "SELECT sock.sock_id AS id, sock.name, sock.description, sock.price, sock.count, sock.image_url_1, sock.image_url_2, GROUP_CONCAT(tag.name) AS tag_name FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id" - // NewCatalogueService returns an implementation of the Service interface, // with connection to an SQL database. func NewCatalogueService(db Database, logger log.Logger) Service { @@ -69,39 +59,10 @@ type catalogueService struct { } func (s *catalogueService) List(ctx context.Context, tags []string, order string, pageNum, pageSize int) ([]Sock, error) { - var socks []Sock - query := baseQuery - - var args []interface{} - - for i, t := range tags { - if i == 0 { - query += " WHERE tag.name=?" - args = append(args, t) - } else { - query += " OR tag.name=?" - args = append(args, t) - } - } - - query += " GROUP BY id" - - if order != "" { - query += " ORDER BY ?" - args = append(args, order) - } - - query += ";" - - err := s.db.Select(ctx, &socks, query, args...) + socks, err := s.db.GetSocks(ctx, tags, order) if err != nil { - s.logger.Log("database error", err) return []Sock{}, ErrDBConnection } - for i, s := range socks { - socks[i].ImageURL = []string{s.ImageURL_1, s.ImageURL_2} - socks[i].Tags = strings.Split(s.TagString, ",") - } // DEMO: Change 0 to 850 time.Sleep(0 * time.Millisecond) @@ -112,54 +73,19 @@ func (s *catalogueService) List(ctx context.Context, tags []string, order string } func (s *catalogueService) Count(ctx context.Context, tags []string) (int, error) { - query := "SELECT COUNT(DISTINCT sock.sock_id) FROM sock JOIN sock_tag ON sock.sock_id=sock_tag.sock_id JOIN tag ON sock_tag.tag_id=tag.tag_id" - - var args []interface{} - - for i, t := range tags { - if i == 0 { - query += " WHERE tag.name=?" - args = append(args, t) - } else { - query += " OR tag.name=?" - args = append(args, t) - } - } - - query += ";" - - sel, err := s.db.Prepare(query) - + count, err := s.db.CountSocks(ctx, tags) if err != nil { - s.logger.Log("database error", err) - return 0, ErrDBConnection + return 0, err } - defer sel.Close() - - var count int - err = sel.QueryRow(ctx, args...).Scan(&count) - - if err != nil { - s.logger.Log("database error", err) - return 0, ErrDBConnection - } - return count, nil } func (s *catalogueService) Get(ctx context.Context, id string) (Sock, error) { - query := baseQuery + " WHERE sock.sock_id =? GROUP BY sock.sock_id;" - - var sock Sock - err := s.db.Get(ctx, &sock, query, id) + sock, err := s.db.GetSock(ctx, id) if err != nil { - s.logger.Log("database error", err) - return Sock{}, ErrNotFound + return Sock{}, err } - sock.ImageURL = []string{sock.ImageURL_1, sock.ImageURL_2} - sock.Tags = strings.Split(sock.TagString, ",") - return sock, nil } @@ -182,21 +108,9 @@ func (s *catalogueService) Health() []Health { } func (s *catalogueService) Tags(ctx context.Context) ([]string, error) { - var tags []string - query := "SELECT name FROM tag;" - rows, err := s.db.Query(ctx, query) + tags, err := s.db.GetTags(ctx) if err != nil { - s.logger.Log("database error", err) - return []string{}, ErrDBConnection - } - var tag string - for rows.Next() { - err = rows.Scan(&tag) - if err != nil { - s.logger.Log("database error", err) - continue - } - tags = append(tags, tag) + return []string{}, err } return tags, nil }