diff --git a/go/libraries/doltcore/sqle/statsnoms/database.go b/go/libraries/doltcore/sqle/statsnoms/database.go index 01cdfa88b1..4e7be79a33 100644 --- a/go/libraries/doltcore/sqle/statsnoms/database.go +++ b/go/libraries/doltcore/sqle/statsnoms/database.go @@ -231,23 +231,26 @@ func (n *NomsStatsDatabase) initMutable(ctx context.Context, i int) error { return nil } -func (n *NomsStatsDatabase) DeleteStats(branch string, quals ...sql.StatQualifier) { +func (n *NomsStatsDatabase) DeleteStats(ctx *sql.Context, branch string, quals ...sql.StatQualifier) { n.mu.Lock() defer n.mu.Unlock() for i, b := range n.branches { if strings.EqualFold(b, branch) { for _, qual := range quals { + ctx.GetLogger().Debugf("statistics refresh: deleting index statistics: %s/%s", branch, qual) delete(n.stats[i], qual) } } } } -func (n *NomsStatsDatabase) DeleteBranchStats(ctx context.Context, branch string, flush bool) error { +func (n *NomsStatsDatabase) DeleteBranchStats(ctx *sql.Context, branch string, flush bool) error { n.mu.Lock() defer n.mu.Unlock() + ctx.GetLogger().Debugf("statistics refresh: deleting branch statistics: %s", branch) + for i, b := range n.branches { if strings.EqualFold(b, branch) { n.branches = append(n.branches[:i], n.branches[i+1:]...) diff --git a/go/libraries/doltcore/sqle/statspro/analyze.go b/go/libraries/doltcore/sqle/statspro/analyze.go index f5a8a58fb3..7da369e13a 100644 --- a/go/libraries/doltcore/sqle/statspro/analyze.go +++ b/go/libraries/doltcore/sqle/statspro/analyze.go @@ -84,10 +84,10 @@ func (p *Provider) BootstrapDatabaseStats(ctx *sql.Context, db string) error { } func (p *Provider) RefreshTableStatsWithBranch(ctx *sql.Context, table sql.Table, db string, branch string) error { - if !p.TryLockForUpdate(table.Name(), db, branch) { + if !p.TryLockForUpdate(branch, db, table.Name()) { return fmt.Errorf("already updating statistics") } - defer p.UnlockTable(table.Name(), db, branch) + defer p.UnlockTable(branch, db, table.Name()) dSess := dsess.DSessFromSess(ctx.Session) diff --git a/go/libraries/doltcore/sqle/statspro/auto_refresh.go b/go/libraries/doltcore/sqle/statspro/auto_refresh.go index e87e2d4677..7d51e9465c 100644 --- a/go/libraries/doltcore/sqle/statspro/auto_refresh.go +++ b/go/libraries/doltcore/sqle/statspro/auto_refresh.go @@ -107,10 +107,10 @@ func (p *Provider) InitAutoRefreshWithParams(ctxFactory func(ctx context.Context } func (p *Provider) checkRefresh(ctx *sql.Context, sqlDb sql.Database, dbName, branch string, updateThresh float64) error { - if !p.TryLockForUpdate("", dbName, branch) { - return nil + if !p.TryLockForUpdate(branch, dbName, "") { + return fmt.Errorf("database already being updated: %s/%s", branch, dbName) } - defer p.UnlockTable("", dbName, branch) + defer p.UnlockTable(branch, dbName, "") // Iterate all dbs, tables, indexes. Each db will collect // []indexMeta above refresh threshold. We read and process those @@ -133,10 +133,12 @@ func (p *Provider) checkRefresh(ctx *sql.Context, sqlDb sql.Database, dbName, br } for _, table := range tables { - if !p.TryLockForUpdate(table, dbName, branch) { - continue + if !p.TryLockForUpdate(branch, dbName, table) { + ctx.GetLogger().Debugf("statistics refresh: table is already being updated: %s/%s.%s", branch, dbName, table) + return fmt.Errorf("table already being updated: %s", table) } - defer p.UnlockTable(table, dbName, branch) + defer p.UnlockTable(branch, dbName, table) + sqlTable, dTab, err := GetLatestTable(ctx, table, sqlDb) if err != nil { return err @@ -240,7 +242,7 @@ func (p *Provider) checkRefresh(ctx *sql.Context, sqlDb sql.Database, dbName, br } } - statDb.DeleteStats(branch, deletedStats...) + statDb.DeleteStats(ctx, branch, deletedStats...) if err := statDb.Flush(ctx, branch); err != nil { return err diff --git a/go/libraries/doltcore/sqle/statspro/interface.go b/go/libraries/doltcore/sqle/statspro/interface.go index ae56b834b1..e88ef2e405 100644 --- a/go/libraries/doltcore/sqle/statspro/interface.go +++ b/go/libraries/doltcore/sqle/statspro/interface.go @@ -36,13 +36,13 @@ type Database interface { LoadBranchStats(ctx *sql.Context, branch string) error // DeleteBranchStats removes references to in memory index statistics. // If |flush| is true delete the data from storage. - DeleteBranchStats(ctx context.Context, branch string, flush bool) error + DeleteBranchStats(ctx *sql.Context, branch string, flush bool) error // GetStat returns a branch's index statistics. GetStat(branch string, qual sql.StatQualifier) (*DoltStats, bool) //SetStat bulk replaces the statistic, deleting any previous version SetStat(ctx context.Context, branch string, qual sql.StatQualifier, stats *DoltStats) error //DeleteStats deletes a list of index statistics. - DeleteStats(branch string, quals ...sql.StatQualifier) + DeleteStats(ctx *sql.Context, branch string, quals ...sql.StatQualifier) // ReplaceChunks is an update interface that lets a stats implementation // decide how to edit stats for a stats refresh. ReplaceChunks(ctx context.Context, branch string, qual sql.StatQualifier, targetHashes []hash.Hash, dropChunks, newChunks []sql.HistogramBucket) error diff --git a/go/libraries/doltcore/sqle/statspro/stats_provider.go b/go/libraries/doltcore/sqle/statspro/stats_provider.go index 4cf3e201c4..6e7ecd29be 100644 --- a/go/libraries/doltcore/sqle/statspro/stats_provider.go +++ b/go/libraries/doltcore/sqle/statspro/stats_provider.go @@ -94,10 +94,10 @@ func newDbStats(dbName string) *dbToStats { var _ sql.StatsProvider = (*Provider)(nil) -func (p *Provider) TryLockForUpdate(table string, db string, branch string) bool { +func (p *Provider) TryLockForUpdate(branch, db, table string) bool { p.mu.Lock() defer p.mu.Unlock() - lockId := fmt.Sprintf("%s.%s.%s", db, branch, table) + lockId := fmt.Sprintf("%s.%s.%s", branch, db, table) if ok := p.lockedTables[lockId]; ok { return false } @@ -105,10 +105,10 @@ func (p *Provider) TryLockForUpdate(table string, db string, branch string) bool return true } -func (p *Provider) UnlockTable(table string, db string, branch string) { +func (p *Provider) UnlockTable(branch, db, table string) { p.mu.Lock() defer p.mu.Unlock() - lockId := fmt.Sprintf("%s.%s.%s", db, branch, table) + lockId := fmt.Sprintf("%s.%s.%s", branch, db, table) p.lockedTables[lockId] = false return } @@ -279,7 +279,7 @@ func (p *Provider) DropStats(ctx *sql.Context, qual sql.StatQualifier, _ []strin } if _, ok := statDb.GetStat(branch, qual); ok { - statDb.DeleteStats(branch, qual) + statDb.DeleteStats(ctx, branch, qual) p.UpdateStatus(qual.Db(), fmt.Sprintf("dropped statisic: %s", qual.String())) }