Skip to content

Commit

Permalink
Atomic map added
Browse files Browse the repository at this point in the history
  • Loading branch information
tknie committed Oct 13, 2024
1 parent c13e72a commit f5b1735
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions server/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"path/filepath"
"slices"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/tknie/clu"
Expand All @@ -28,23 +30,31 @@ import (
"github.com/tknie/services"
)

var dbList = make(map[string]*common.Reference)
var dbDictionary = make(map[string]*common.Reference)
type databaseRegister struct {
readCount uint64
reference *common.Reference
}

// dbDictionary map of hash to database registry entry
var dbDictionary = sync.Map{}

// dbTableMap map of database table and registry entry
var dbTableMap = sync.Map{}

// init config update tracker to tracke changes
func init() {
RegisterConfigUpdates(initRegister)
}

// initRegister initialize database register getting all current available database
func initRegister() {
log.Log.Debugf("Register databases")
initTableOfDatabases()
// loadTableOfDatabases()
log.Log.Debugf("Start table thread for databases")
go loadTableThread()
}

func loadTableThread() {
//uptimeTicker := time.NewTicker(5 * time.Second)
dateTicker := time.NewTicker(60 * time.Second)
loadTableOfDatabases()
for {
Expand All @@ -60,9 +70,11 @@ func databaseHash(dm *Database) string {
// Handles handle database
func Handles(dm *Database) (*common.Reference, error) {
dHash := databaseHash(dm)
if ref, ok := dbList[dHash]; ok {
if e, ok := dbDictionary.Load(dHash); ok {
regEntry := e.(*databaseRegister)
log.Log.Debugf("Found database hash %s -> %s", dHash, os.ExpandEnv(dm.String()))
return ref, nil
atomic.AddUint64(&regEntry.readCount, 1)
return regEntry.reference, nil
}
log.Log.Infof("Add database hash %s -> %s", dHash, os.ExpandEnv(dm.String()))
target := os.ExpandEnv(dm.Target)
Expand All @@ -77,7 +89,7 @@ func Handles(dm *Database) (*common.Reference, error) {
services.ServerMessage("Error registering database <%s>: %v", dm.Target, err)
return nil, fmt.Errorf("error registering database")
}
dbList[dHash] = ref
dbDictionary.Store(dHash, &databaseRegister{reference: ref, readCount: 1})
for i := 0; i < len(dm.Tables); i++ {
dm.Tables[i] = strings.ToLower(dm.Tables[i])
}
Expand Down Expand Up @@ -123,24 +135,29 @@ func loadTableOfDatabases() {
newDatabases := make([]string, 0)
for _, table := range flynn.Maps() {
s := strings.ToLower(table)
if sid, ok := dbDictionary[s]; ok {
if sid != id {
if sid, ok := dbTableMap.Load(s); ok {
if sid.(*databaseRegister).reference != id {
services.ServerMessage("Found table on different databases: [%s]", s)
}
} else {
if checkFilter(dm.Tables, table) {
log.Log.Debugf("Append table: %s", table)
newDatabases = append(newDatabases, s)
dbDictionary[s] = id
dbTableMap.Store(s, &databaseRegister{reference: id})
} else {
log.Log.Debugf("Ignore table: %s", table)
}
}
}
if len(newDatabases) > 0 {
services.ServerMessage("Found table(s) to dictionary:\n%v", newDatabases)
services.ServerMessage("Collected %04d table(s) in dictionary", len(newDatabases))
}
}
dbTableMap.Range(func(key, value any) bool {
tableEntry := value.(*databaseRegister)
log.Log.Infof("Database with table %s count: %d", key, tableEntry.readCount)
return true
})

}

Expand All @@ -152,17 +169,20 @@ func InitDatabases() {
// GetAllViews get all table and view names
func GetAllViews() []string {
viewList := make([]string, 0)
for k := range dbDictionary {
viewList = append(viewList, k)
}
dbTableMap.Range(func(key, value any) bool {
viewList = append(viewList, key.(string))
return true
})
return viewList
}

// SearchTable search table ref ID
func SearchTable(table string) (*common.Reference, error) {
name := strings.ToLower(table)
if d, ok := dbDictionary[name]; ok {
return d, nil
if d, ok := dbTableMap.Load(name); ok {
dicEntry := d.(*databaseRegister)
atomic.AddUint64(&dicEntry.readCount, 1)
return dicEntry.reference, nil
}
return nil, errorrepo.NewError("RERR01000", table)
}
Expand Down

0 comments on commit f5b1735

Please sign in to comment.