Skip to content

Commit

Permalink
[Feature][SIG-27426] Add hooks to fetch metadata for Databricks queri…
Browse files Browse the repository at this point in the history
…es (#14)

To make debugging Databricks queries easier, add some basic hooks to
fetch metadata for queries (e.g. session ID, query ID) from Databricks
queries. The intention here is that Multiplex will register callbacks
using `WithOpenSessionHook` and `WithOperationMetadataHook` that will
store the metadata that Databricks returns as trace tags.

I went with this approach instead of stuffing the metadata in the
Databricks driver's `Rows` struct, mainly because `Rows` doesn't always
get returned, e.g. when executing statements that don't return any data,
or when there are errors.

While the goal here is to simplify debugging failed queries, it's
possible for a Databricks query to fail without giving us a query ID,
namely, if `ExecuteStatement` returns an error. This happens when the
query contains a syntax error, for example. In that case, we'll at least
have the session ID, but we can continue to iterate on fetching query
IDs when `ExecuteStatement` fails. However, this will enable us to get
query IDs for queries that fail at any later point.

Signed-off-by: Eric Bannatyne <[email protected]>
  • Loading branch information
aldld authored Nov 1, 2022
1 parent de2f224 commit d231b49
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 0 deletions.
2 changes: 2 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (c *Conn) QueryContext(ctx context.Context, q string, args []driver.NamedVa
if err != nil {
return nil, hive.WithStack(err)
}
callOpenSessionHook(ctx, session.GetSessionId())

tmpl := template(q)
stmt, err := statement(tmpl, args)
Expand All @@ -96,6 +97,7 @@ func (c *Conn) ExecContext(ctx context.Context, q string, args []driver.NamedVal
if err != nil {
return nil, hive.WithStack(err)
}
callOpenSessionHook(ctx, session.GetSessionId())

tmpl := template(q)
stmt, err := statement(tmpl, args)
Expand Down
57 changes: 57 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package dbsql

import (
"context"
)

type contextKey string

const (
openSessionHook contextKey = "OPEN_SESSION_HOOK"
operationMetadataHook contextKey = "OPERATION_METADATA_HOOK"
)

// WithOpenSessionHook registers a callback that will be executed with the
// Databricks session ID as input when a session is acquired for running a query,
// whether by reusing a cached session ID or by creating a new session.
func WithOpenSessionHook(
ctx context.Context,
fn func(string),
) context.Context {
return context.WithValue(ctx, openSessionHook, fn)
}

func callOpenSessionHook(ctx context.Context, sessionId string) {
callContextHook(ctx, openSessionHook, sessionId)
}

type OperationMetadata interface {
GetOperationId() string
HasResultSet() bool
RowsAffected() float64
}

// WithOperationMetadataHook registers a callback that will be executed after an
// ExecuteStatement thrift request.
func WithOperationMetadataHook(
ctx context.Context,
fn func(OperationMetadata),
) context.Context {
return context.WithValue(ctx, operationMetadataHook, fn)
}

func callOperationMetadataHook(ctx context.Context, metadata OperationMetadata) {
callContextHook(ctx, operationMetadataHook, metadata)
}

func callContextHook[T any](ctx context.Context, key contextKey, input T) {
val := ctx.Value(key)
if val == nil {
return
}
fn, ok := val.(func(T))
if !ok {
return
}
fn(input)
}
4 changes: 4 additions & 0 deletions hive/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ type Operation struct {
h *cli_service.TOperationHandle
}

func (op *Operation) GetOperationId() string {
return guid(op.h.GetOperationId().GUID)
}

// HasResultSet return if operation has result set
func (op *Operation) HasResultSet() bool {
return op.h.GetHasResultSet()
Expand Down
4 changes: 4 additions & 0 deletions hive/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type Session struct {
h *cli_service.TSessionHandle
}

func (s *Session) GetSessionId() string {
return guid(s.h.GetSessionId().GUID)
}

// Ping checks the connection
func (s *Session) Ping(ctx context.Context) error {
req := cli_service.TGetInfoReq{
Expand Down
2 changes: 2 additions & 0 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func query(ctx context.Context, session *hive.Session, stmt string) (driver.Rows
if err != nil {
return nil, hive.WithStack(err)
}
callOperationMetadataHook(ctx, operation)

schema, err := operation.GetResultSetMetadata(ctx)
if err != nil {
Expand All @@ -144,6 +145,7 @@ func exec(ctx context.Context, session *hive.Session, stmt string) (driver.Resul
if err != nil {
return nil, hive.WithStack(err)
}
callOperationMetadataHook(ctx, operation)

if err := operation.Close(ctx); err != nil {
return nil, hive.WithStack(err)
Expand Down

0 comments on commit d231b49

Please sign in to comment.