Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When a SQL error occurs, clean up all prepared statement. #2936

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions storage/mysql/log_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ const (
)

var (
once sync.Once
queuedCounter monitoring.Counter
queuedDupCounter monitoring.Counter
dequeuedCounter monitoring.Counter
once sync.Once
queuedCounter monitoring.Counter
queuedDupCounter monitoring.Counter
dequeuedCounter monitoring.Counter
clearedAllStmtCounter monitoring.Counter

queueLatency monitoring.Histogram
queueInsertLatency monitoring.Histogram
Expand All @@ -101,6 +102,7 @@ func createMetrics(mf monitoring.MetricFactory) {
queuedCounter = mf.NewCounter("mysql_queued_leaves", "Number of leaves queued", logIDLabel)
queuedDupCounter = mf.NewCounter("mysql_queued_dup_leaves", "Number of duplicate leaves queued", logIDLabel)
dequeuedCounter = mf.NewCounter("mysql_dequeued_leaves", "Number of leaves dequeued", logIDLabel)
clearedAllStmtCounter = mf.NewCounter("mysql_cleared_all_prepared_statements", "Number of all prepared statements cleared")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: -> "number of times the prepared-statement cache has been cleared"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


queueLatency = mf.NewHistogram("mysql_queue_leaves_latency", "Latency of queue leaves operation in seconds", logIDLabel)
queueInsertLatency = mf.NewHistogram("mysql_queue_leaves_latency_insert", "Latency of insertion part of queue leaves operation in seconds", logIDLabel)
Expand Down Expand Up @@ -741,6 +743,7 @@ func (t *logTreeTX) getLeavesByHashInternal(ctx context.Context, leafHashes [][]
args = append(args, t.treeID)
rows, err := stx.QueryContext(ctx, args...)
if err != nil {
t.ls.cleanAllStmt()
klog.Warningf("Query() %s hash = %v", desc, err)
return nil, err
}
Expand Down
23 changes: 21 additions & 2 deletions storage/mysql/tree_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,25 @@ func expandPlaceholderSQL(sql string, num int, first, rest string) string {
return strings.Replace(sql, placeholderSQL, parameters, 1)
}

// clearAllStmt clean up all sql.Stmt in cache
func (m *mySQLTreeStorage) cleanAllStmt() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would clearStmtCache be a clearer name for this func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

m.statementMutex.Lock()
defer m.statementMutex.Unlock()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some kind of logging for this would be really useful. This should be a rare event in an otherwise healthy system:

klog.Info("Clearing all prepared statements")

Actually, for bonus points I think it's worth incrementing a counter whenever we clear statements, which should appear on monitoring dashboards and allow correlation with potential impacts such as increased latency or errors. Take a look at queuedCounter in this class as an example (though we won't need a logID for this as it's effectively global).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

klog.Info("Clearing all prepared statements")

for _, ns := range m.statements {
for _, s := range ns {
if err := s.Close(); err != nil {
klog.Warningf("Failed to close stmt: %s", err)
}
}
}

m.statements = make(map[string]map[int]*sql.Stmt)
clearedAllStmtCounter.Inc()
}

// getStmt creates and caches sql.Stmt structs based on the passed in statement
// and number of bound arguments.
// TODO(al,martin): consider pulling this all out as a separate unit for reuse
Expand All @@ -114,8 +133,6 @@ func (m *mySQLTreeStorage) getStmt(ctx context.Context, statement string, num in

if m.statements[statement] != nil {
if m.statements[statement][num] != nil {
// TODO(al,martin): we'll possibly need to expire Stmts from the cache,
// e.g. when DB connections break etc.
return m.statements[statement][num], nil
}
} else {
Expand Down Expand Up @@ -200,6 +217,7 @@ func (t *treeTX) getSubtrees(ctx context.Context, treeRevision int64, ids [][]by

rows, err := stx.QueryContext(ctx, args...)
if err != nil {
t.ts.cleanAllStmt()
klog.Warningf("Failed to get merkle subtrees: %s", err)
return nil, err
}
Expand Down Expand Up @@ -296,6 +314,7 @@ func (t *treeTX) storeSubtrees(ctx context.Context, subtrees []*storagepb.Subtre

r, err := stx.ExecContext(ctx, args...)
if err != nil {
t.ts.cleanAllStmt()
klog.Warningf("Failed to set merkle subtrees: %s", err)
return err
}
Expand Down