Skip to content

Commit

Permalink
session: allow log configuration using Logger interface.
Browse files Browse the repository at this point in the history
The driver now provides a Logger interface allowing to supply
a custom logger as part of (Session)ConnConfig.

Driver also provides three implementations of Logger by itself:
- DefaultLogger, logging only warnings
- DebugLogger, logging everything that can be useful for debugging,
  it is used by default in tests
- NopLogger, logging nothing

Fixes #271
  • Loading branch information
Kulezi committed Sep 23, 2022
1 parent c06f706 commit 5369a73
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 59 deletions.
69 changes: 69 additions & 0 deletions log/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package log

import (
"log"
"os"
)

type Logger interface {
Info(v ...any)
Infof(format string, v ...any)
Infoln(v ...any)

Warn(v ...any)
Warnf(format string, v ...any)
Warnln(v ...any)
}

// DefaultLogger only logs warnings and critical errors.
type DefaultLogger struct {
warn *log.Logger
}

func NewDefaultLogger() *DefaultLogger {
res := &DefaultLogger{
warn: log.New(os.Stderr, "WARNING ", log.LstdFlags),
}
return res
}

func (logger *DefaultLogger) Info(v ...any) {}
func (logger *DefaultLogger) Infof(format string, v ...any) {}
func (logger *DefaultLogger) Infoln(v ...any) {}

func (logger *DefaultLogger) Warn(v ...any) { logger.warn.Print(v...) }
func (logger *DefaultLogger) Warnf(format string, v ...any) { logger.warn.Printf(format, v...) }
func (logger *DefaultLogger) Warnln(v ...any) { logger.warn.Println(v...) }

// DebugLogger logs both warnings and information about important events in driver's runtime.
type DebugLogger struct {
info *log.Logger
warn *log.Logger
}

func NewDebugLogger() *DebugLogger {
res := &DebugLogger{
info: log.New(os.Stderr, "INFO ", log.LstdFlags),
warn: log.New(os.Stderr, "WARNING ", log.LstdFlags),
}
return res
}

func (logger *DebugLogger) Info(v ...any) { logger.info.Print(v...) }
func (logger *DebugLogger) Infof(format string, v ...any) { logger.info.Printf(format, v...) }
func (logger *DebugLogger) Infoln(v ...any) { logger.info.Println(v...) }

func (logger *DebugLogger) Warn(v ...any) { logger.warn.Print(v...) }
func (logger *DebugLogger) Warnf(format string, v ...any) { logger.warn.Printf(format, v...) }
func (logger *DebugLogger) Warnln(v ...any) { logger.warn.Println(v...) }

// NopLogger doesn't log anything.
type NopLogger struct{}

func (NopLogger) Info(v ...any) {}
func (NopLogger) Infof(format string, v ...any) {}
func (NopLogger) Infoln(v ...any) {}

func (NopLogger) Warn(v ...any) {}
func (NopLogger) Warnf(format string, v ...any) {}
func (NopLogger) Warnln(v ...any) {}
3 changes: 1 addition & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scylla
import (
"context"
"fmt"
"log"
"sync"
"time"

Expand Down Expand Up @@ -271,6 +270,6 @@ func (s *Session) NewTokenAwareDCAwarePolicy(localDC string) transport.HostSelec
}

func (s *Session) Close() {
log.Println("session: close")
s.cfg.Logger.Info("session: close")
s.cluster.Close()
}
7 changes: 6 additions & 1 deletion session_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ import (

"github.com/scylladb/scylla-go-driver/frame"
"github.com/scylladb/scylla-go-driver/frame/response"
"github.com/scylladb/scylla-go-driver/log"
"github.com/scylladb/scylla-go-driver/transport"
"go.uber.org/goleak"
)

const TestHost = "192.168.100.100"

var testingSessionConfig = DefaultSessionConfig("mykeyspace", TestHost)
var testingSessionConfig = func() SessionConfig {
cfg := DefaultSessionConfig("mykeyspace", TestHost)
cfg.Logger = log.NewDebugLogger()
return cfg
}()

func initKeyspace(ctx context.Context, t testing.TB) {
t.Helper()
Expand Down
37 changes: 18 additions & 19 deletions transport/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package transport
import (
"context"
"fmt"
"log"
"net"
"sort"
"strconv"
Expand Down Expand Up @@ -160,7 +159,7 @@ func NewCluster(ctx context.Context, cfg ConnConfig, p HostSelectionPolicy, e []
}

func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) {
log.Printf("cluster: open control connection")
c.cfg.Logger.Info("cluster: open control connection")
var errs []string
for addr := range c.knownHosts {
conn, err := OpenConn(ctx, addr, nil, c.cfg)
Expand All @@ -184,7 +183,7 @@ func (c *Cluster) NewControl(ctx context.Context) (*Conn, error) {
// refreshTopology creates new topology filled with the result of keyspaceQuery, localQuery and peerQuery.
// Old topology is replaced with the new one atomically to prevent dirty reads.
func (c *Cluster) refreshTopology(ctx context.Context) error {
log.Printf("cluster: refresh topology")
c.cfg.Logger.Infoln("cluster: refresh topology")
rows, err := c.getAllNodesInfo(ctx)
if err != nil {
return fmt.Errorf("query info about nodes in cluster: %w", err)
Expand Down Expand Up @@ -236,9 +235,9 @@ func (c *Cluster) refreshTopology(ctx context.Context) error {
}

if ks, ok := t.keyspaces[c.cfg.Keyspace]; ok {
t.policyInfo.Preprocess(t, ks)
t.policyInfo.Preprocess(t, ks, c.cfg.Logger)
} else {
t.policyInfo.Preprocess(t, keyspace{})
t.policyInfo.Preprocess(t, keyspace{}, c.cfg.Logger)
}

c.setTopology(t)
Expand Down Expand Up @@ -450,7 +449,7 @@ func (c *Cluster) setTopology(t *topology) {
// of registering handlers for them.
func (c *Cluster) handleEvent(ctx context.Context, r response) {
if r.Err != nil {
log.Printf("cluster: received event with error: %v", r.Err)
c.cfg.Logger.Infoln("cluster: received event with error: %v", r.Err)
c.RequestReopenControl()
return
}
Expand All @@ -462,17 +461,17 @@ func (c *Cluster) handleEvent(ctx context.Context, r response) {
case *SchemaChange:
// TODO: add schema change.
default:
log.Printf("cluster: unsupported event type: %v", r.Response)
c.cfg.Logger.Warnf("cluster: unsupported event type: %v", r.Response)
}
}

func (c *Cluster) handleTopologyChange(v *TopologyChange) {
log.Printf("cluster: handle topology change: %+#v", v)
c.cfg.Logger.Infof("cluster: handle topology change: %+#v", v)
c.RequestRefresh()
}

func (c *Cluster) handleStatusChange(ctx context.Context, v *StatusChange) {
log.Printf("cluster: handle status change: %+#v", v)
c.cfg.Logger.Infof("cluster: handle status change: %+#v", v)
m := c.Topology().peers
addr := v.Address.String()
if n, ok := m[addr]; ok {
Expand All @@ -482,10 +481,10 @@ func (c *Cluster) handleStatusChange(ctx context.Context, v *StatusChange) {
case frame.Down:
n.setStatus(statusDown)
default:
log.Printf("cluster: status change not supported: %+#v", v)
c.cfg.Logger.Warnf("cluster: status change not supported: %+#v", v)
}
} else {
log.Printf("cluster: unknown node %s received status change: %+#v in topology %v", addr, v, m)
c.cfg.Logger.Infof("cluster: unknown node %s received status change: %+#v in topology %v, requesting topology refresh", addr, v, m)
c.RequestRefresh()
}
}
Expand All @@ -503,7 +502,7 @@ func (c *Cluster) loop(ctx context.Context) {
case <-c.reopenControlChan:
c.tryReopenControl(ctx)
case <-ctx.Done():
log.Printf("cluster closing due to: %v", ctx.Err())
c.cfg.Logger.Infof("cluster closing due to: %v", ctx.Err())
c.handleClose()
return
case <-c.closeChan:
Expand All @@ -523,17 +522,17 @@ func (c *Cluster) tryRefresh(ctx context.Context) {
if err := c.refreshTopology(ctx); err != nil {
c.RequestReopenControl()
time.AfterFunc(tryRefreshInterval, c.RequestRefresh)
log.Printf("cluster: refresh topology: %v", err)
c.cfg.Logger.Infof("cluster: refresh topology: %v", err)
}
}

const tryReopenControlInterval = time.Second

func (c *Cluster) tryReopenControl(ctx context.Context) {
log.Printf("cluster: reopen control connection")
c.cfg.Logger.Infoln("cluster: reopen control connection")
if control, err := c.NewControl(ctx); err != nil {
time.AfterFunc(tryReopenControlInterval, c.RequestReopenControl)
log.Printf("cluster: failed to reopen control connection: %v", err)
c.cfg.Logger.Infof("cluster: failed to reopen control connection: %v", err)
} else {
c.control.Close()
c.control = control
Expand All @@ -542,7 +541,7 @@ func (c *Cluster) tryReopenControl(ctx context.Context) {
}

func (c *Cluster) handleClose() {
log.Printf("cluster: handle cluster close")
c.cfg.Logger.Infoln("cluster: handle cluster close")
c.control.Close()
m := c.Topology().peers
for _, n := range m {
Expand All @@ -551,23 +550,23 @@ func (c *Cluster) handleClose() {
}

func (c *Cluster) RequestRefresh() {
log.Printf("cluster: requested to refresh cluster topology")
c.cfg.Logger.Infoln("cluster: requested to refresh cluster topology")
select {
case c.refreshChan <- struct{}{}:
default:
}
}

func (c *Cluster) RequestReopenControl() {
log.Printf("cluster: requested to reopen control connection")
c.cfg.Logger.Infoln("cluster: requested to reopen control connection")
select {
case c.reopenControlChan <- struct{}{}:
default:
}
}

func (c *Cluster) Close() {
log.Printf("cluster: requested to close cluster")
c.cfg.Logger.Infoln("cluster: requested to close cluster")
select {
case c.closeChan <- struct{}{}:
default:
Expand Down
Loading

0 comments on commit 5369a73

Please sign in to comment.