Skip to content

Commit

Permalink
Merge pull request #74 from sei-protocol/ReadKvFilesUpdate
Browse files Browse the repository at this point in the history
Add upper limit on file parsing goroutines
  • Loading branch information
Kbhat1 authored Sep 17, 2024
2 parents 22fde3c + 353d4ea commit b04b4ba
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
8 changes: 4 additions & 4 deletions tools/dbbackend/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func writeToDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePair, con
// Given an input dir containing all the raw kv data, it writes to the db one version after another
func BenchmarkDBWrite(db types.StateStore, inputKVDir string, numVersions int, concurrency int, batchSize int) {
startLoad := time.Now()
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func readFromDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePair, nu
// BenchmarkDBRead measures random read performance of the db
// Given an input dir containing all the raw kv data, it generates random read load and measures performance.
func BenchmarkDBRead(db types.StateStore, inputKVDir string, numVersions int, concurrency int, maxOps int64) {
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -279,7 +279,7 @@ func forwardIterateDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePa
// BenchmarkDBForwardIteration measures forward iteration performance of the db
// Given an input dir containing all the raw kv data, it selects a random key, forward iterates and measures performance.
func BenchmarkDBForwardIteration(db types.StateStore, inputKVDir string, numVersions int, concurrency int, maxOps int64, iterationSteps int) {
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -371,7 +371,7 @@ func reverseIterateDBConcurrently(db types.StateStore, allKVs []utils.KeyValuePa
// BenchmarkDBReverseIteration measures reverse iteration performance of the db
// Given an input dir containing all the raw kv data, it selects a random key, reverse iterates and measures performance.
func BenchmarkDBReverseIteration(db types.StateStore, inputKVDir string, numVersions int, concurrency int, maxOps int64, iterationSteps int) {
kvData, err := utils.LoadAndShuffleKV(inputKVDir)
kvData, err := utils.LoadAndShuffleKV(inputKVDir, concurrency)
if err != nil {
panic(err)
}
Expand Down
40 changes: 26 additions & 14 deletions tools/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,33 +220,45 @@ func ListAllFiles(dir string) ([]string, error) {
return fileNames, nil
}

func LoadAndShuffleKV(inputDir string) ([]KeyValuePair, error) {
func LoadAndShuffleKV(inputDir string, concurrency int) ([]KeyValuePair, error) {
var allKVs []KeyValuePair
mu := &sync.Mutex{}
wg := &sync.WaitGroup{}

allFiles, err := ListAllFiles(inputDir)
if err != nil {
log.Fatalf("Failed to list all files: %v", err)
}

for _, file := range allFiles {
filesChan := make(chan string)
wg := &sync.WaitGroup{}

// Start worker goroutines
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id string, selectedFile string) {
go func() {
defer wg.Done()

kvEntries, err := ReadKVEntriesFromFile(filepath.Join(id, selectedFile))
if err != nil {
panic(err)
for selectedFile := range filesChan {
kvEntries, err := ReadKVEntriesFromFile(filepath.Join(inputDir, selectedFile))
if err != nil {
panic(err)
}

// Safely append the kvEntries to allKVs
mu.Lock()
allKVs = append(allKVs, kvEntries...)
fmt.Printf("Done processing file %+v\n", filepath.Join(inputDir, selectedFile))
mu.Unlock()
}
}()
}

// Safely append the kvEntries to allKVs
mu.Lock()
allKVs = append(allKVs, kvEntries...)
fmt.Printf("Done processing file %+v\n", filepath.Join(id, selectedFile))
mu.Unlock()
}(inputDir, file)
// Send file names to filesChan
for _, file := range allFiles {
filesChan <- file
}
close(filesChan)

// Wait for all workers to finish
wg.Wait()

rand.Shuffle(len(allKVs), func(i, j int) {
Expand Down

0 comments on commit b04b4ba

Please sign in to comment.