From 51fc3f07320776eac3ce694e1127b56eb8d5658d Mon Sep 17 00:00:00 2001 From: Richard Lin <35508487+richardlin047@users.noreply.github.com> Date: Wed, 21 Jun 2023 13:58:39 -0700 Subject: [PATCH] [DC-1780] Add clustering and mixed insert/update mode (#27) This PR adds the following functionality: - Specify number of distinct cluster values to test clustered collections via `NUM_CLUSTERS` - Specify percentage of documents hitting a specific cluster to simulate hot clustering via `HOT_CLUSTER_PERCENTAGE` - Add a mode that mixes inserts and updates randomly via `mixed` mode, `UPDATE_PERCENTAGE`, and `MAX_DOCS` Mixed mode only works well if there's one kubernetes replica doing the work. The max doc id, n, is tracked locally so that any docs with doc id 0 to n will be updates and n+1 will be an insert. The alternative that would work for multiple replicas is to use a uuid on inserts and for updates choose from 0 to n. This means the collection would need to be populated with docs from 0 to n first, and any newly inserted docs with uuids would never be updated. --- generator/document.go | 62 +++++++++++++++++++++++++++++++-------- generator/elastic_test.go | 12 +++++++- generator/rockset_test.go | 12 +++++++- main.go | 50 ++++++++++++++++++++++++++++--- 4 files changed, 118 insertions(+), 18 deletions(-) diff --git a/generator/document.go b/generator/document.go index 7ee8537..71aa79d 100644 --- a/generator/document.go +++ b/generator/document.go @@ -10,6 +10,17 @@ import ( guuid "github.com/google/uuid" ) +type DocumentSpec struct { + Destination string + GeneratorIdentifier string + BatchSize int + Mode string + IdMode string + UpdatePercentage int + NumClusters int + HotClusterPercentage int +} + type DocStruct struct { Guid string IsActive bool @@ -59,8 +70,9 @@ type FriendDetailsStruct struct { } var doc_id = 0 +var max_doc_id = 0 -func GenerateDoc(destination, identifier string, idMode string) (interface{}, error) { +func GenerateDoc(spec DocumentSpec) (interface{}, error) { docStruct := DocStruct{} err := faker.FakeData(&docStruct) if err != nil { @@ -74,31 +86,57 @@ func GenerateDoc(destination, identifier string, idMode string) (interface{}, er return nil, fmt.Errorf("failed to unmarshal document: %w", err) } - if destination == "rockset" { - if idMode == "uuid" { + if spec.Destination == "rockset" { + if spec.Mode == "mixed" { + // Randomly choose a number to decide whether to generate a doc with an existing doc id + if rand.Intn(100) < spec.UpdatePercentage { + // Choose random id from one already existing doc id + doc["_id"] = formatDocId(rand.Intn(getMaxDoc())) + } else { + doc["_id"] = formatDocId(getMaxDoc()) + SetMaxDoc(getMaxDoc()+1) + } + doc_id = doc_id + 1 + // All other modes + } else if spec.IdMode == "uuid" { doc["_id"] = guuid.New().String() - } else { + } else if spec.IdMode == "sequential"{ doc["_id"] = formatDocId(doc_id) doc_id = doc_id + 1 + } else { + panic(fmt.Sprintf("Unsupported generateDoc case: %s", spec.IdMode)) } } + if spec.NumClusters > 0 { + doc["cluster1"] = getClusterKey(spec.NumClusters, spec.HotClusterPercentage) + } + doc["_event_time"] = CurrentTimeMicros() // Set _ts as _event_time is not mutable doc["_ts"] = CurrentTimeMicros() - doc["generator_identifier"] = identifier + doc["generator_identifier"] = spec.GeneratorIdentifier return doc, nil } +func getClusterKey(numClusters int, hotClusterPercentage int) string { + if hotClusterPercentage > 0 && rand.Intn(100) < hotClusterPercentage { + return "0@gmail.com" + } else { + return fmt.Sprintf("%d@gmail.com", rand.Intn(numClusters)) + } +} + func getMaxDoc() int { // doc_ids are left padded monotonic integers, //this returns the highest exclusive doc id for purposes of issuing patches. - return doc_id + return max_doc_id } func SetMaxDoc(maxDocId int) { - doc_id = maxDocId + // doc_id = maxDocId + max_doc_id = maxDocId } func CurrentTimeMicros() int64 { @@ -106,11 +144,11 @@ func CurrentTimeMicros() int64 { return int64(time.Nanosecond) * t.UnixNano() / int64(time.Microsecond) } -func GenerateDocs(batchSize int, destination, identifier string, idMode string) ([]interface{}, error) { - var docs = make([]interface{}, batchSize, batchSize) +func GenerateDocs(spec DocumentSpec) ([]interface{}, error) { + var docs = make([]interface{}, spec.BatchSize, spec.BatchSize) - for i := 0; i < batchSize; i++ { - doc, err := GenerateDoc(destination, identifier, idMode) + for i := 0; i < spec.BatchSize; i++ { + doc, err := GenerateDoc(spec) if err != nil { return nil, err } @@ -262,7 +300,7 @@ func genUniqueInRange(limit int, count int) []int { ids := make([]int, count) i := 0 - for k, _ := range ids_to_patch { + for k := range ids_to_patch { ids[i] = k i++ } diff --git a/generator/elastic_test.go b/generator/elastic_test.go index 3979abd..a2d4066 100644 --- a/generator/elastic_test.go +++ b/generator/elastic_test.go @@ -42,8 +42,18 @@ func TestElastic_GetLatestTimestamp(t *testing.T) { func TestElastic_SendDocument(t *testing.T) { r := NewElasticClient("") + spec := DocumentSpec{ + Destination: "elastic", + GeneratorIdentifier: r.GeneratorIdentifier, + BatchSize: 10, + Mode: "add", + IdMode: "sequential", + UpdatePercentage: -1, + NumClusters: -1, + HotClusterPercentage: -1, + }; - docs, err := GenerateDocs(10, "Elastic", r.GeneratorIdentifier, "sequential") + docs, err := GenerateDocs(spec) assert.Nil(t, err) err = r.SendDocument(docs) assert.Nil(t, err) diff --git a/generator/rockset_test.go b/generator/rockset_test.go index 2e0ec7b..3b63fef 100644 --- a/generator/rockset_test.go +++ b/generator/rockset_test.go @@ -57,8 +57,18 @@ func TestRockset_GetLatestTimestamp(t *testing.T) { func TestRockset_SendDocument(t *testing.T) { r := NewRocksetClient("") + spec := DocumentSpec{ + Destination: "rockset", + GeneratorIdentifier: r.GeneratorIdentifier, + BatchSize: 10, + Mode: "add", + IdMode: "uuid", + UpdatePercentage: -1, + NumClusters: -1, + HotClusterPercentage: -1, + }; - docs, err := GenerateDocs(10, "Rockset", r.GeneratorIdentifier, "uuid") + docs, err := GenerateDocs(spec) assert.Nil(t, err) err = r.SendDocument(docs) assert.Nil(t, err) diff --git a/main.go b/main.go index afd3db9..d88c9f1 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ func main() { batchSize := mustGetEnvInt("BATCH_SIZE") destination := strings.ToLower(mustGetEnvString("DESTINATION")) numDocs := getEnvDefaultInt("NUM_DOCS", -1) + maxDocs := getEnvDefaultInt("MAX_DOCS", -1) // Used to track the known max doc id for upserts to update existing collections mode := getEnvDefault("MODE", "add") idMode := getEnvDefault("ID_MODE", "uuid") patchMode := getEnvDefault("PATCH_MODE", "replace") @@ -35,11 +36,18 @@ func main() { replicas := getEnvDefaultInt("REPLICAS", 1) promPort := getEnvDefaultInt("PROM_PORT", 9161) + // Mixed mode related settings + updatePercentage := getEnvDefaultInt("UPDATE_PERCENTAGE", -1) // Percentage of documents that update existing documents + + // Clustering related settings + numClusters := getEnvDefaultInt("NUM_CLUSTERS", -1) // Number of distinct values for the cluster key + hotClusterPercentage := getEnvDefaultInt("HOT_CLUSTER_PERCENTAGE", -1) // Percentage of inserts/updates that go to single cluster key. Remaining percentage is uniformly distributed + if !(patchMode == "replace" || patchMode == "add") { panic("Invalid patch mode specified, expecting either 'replace' or 'add'") } - if !(mode == "add" || mode == "patch" || mode == "add_then_patch") { - panic("Invalid mode specified, expecting one of 'add', 'patch', 'add_then_patch'") + if !(mode == "add" || mode == "patch" || mode == "add_then_patch" || mode == "mixed") { + panic("Invalid mode specified, expecting one of 'add', 'patch', 'add_then_patch', 'mixed'") } if !(idMode == "uuid" || idMode == "sequential") { panic("Invalid idMode specified, expecting 'uuid' or 'sequential'") @@ -53,6 +61,26 @@ func main() { panic("Patch mode requires a positive number of docs to perform patches against. Please specify a number of documents via NUM_DOCS env var.") } + if mode == "mixed" { + if idMode != "sequential" { + panic("`mixed` MODE supports ID_MODE `sequential` only") + } + if updatePercentage < 0 || updatePercentage > 100 { + panic("`mixed` MODE requires a positive number between 0 and 100. Please specify the percentage of documents to be updates via UPDATE_PERCENTAGE env var") + } + if maxDocs <= 0 { + panic("`mixed` MODE requires a positive number for MAX_DOCS. This tracks the maximum doc id in the collection and can be used to continue adding document ids sequentially. If no documents exist, specify 1") + } + } + + if hotClusterPercentage > 0 && numClusters < 0 { + panic("NUM_CLUSTERS must be specified if HOT_CLUSTER_PERCENTAGE is provided.") + } + + if hotClusterPercentage == 0 || hotClusterPercentage > 100 || numClusters == 0 { + panic("NUM_CLUSTERS must be a positive number and HOT_CLUSTER_PERCENTAGE must be greater than 0 and less than or equal to 100 if specified.") + } + pps := getEnvDefaultInt("PPS", wps) defaultRoundTripper := http.DefaultTransport defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport) @@ -67,6 +95,17 @@ func main() { generatorIdentifier := generator.RandomString(10) fmt.Println("Generator identifier: ", generatorIdentifier) + documentSpec := generator.DocumentSpec{ + Destination: destination, + GeneratorIdentifier: generatorIdentifier, + BatchSize: batchSize, + Mode: mode, + IdMode: idMode, + UpdatePercentage: updatePercentage, + NumClusters: numClusters, + HotClusterPercentage: hotClusterPercentage, + } + var d generator.Destination switch destination { @@ -177,7 +216,10 @@ func main() { docs_written := 0 t := time.NewTicker(time.Second) defer t.Stop() - if mode == "add_then_patch" || mode == "add" { + if mode == "add_then_patch" || mode == "add" || mode == "mixed" { + if mode == "mixed" { + generator.SetMaxDoc(maxDocs) + } for numDocs < 0 || docs_written < numDocs { select { // when doneChan is closed, receive immediately returns the zero value @@ -187,7 +229,7 @@ func main() { case <-t.C: for i := 0; i < wps; i++ { // TODO: move doc generation out of this loop into a go routine that pre-generates them - docs, err := generator.GenerateDocs(batchSize, destination, generatorIdentifier, idMode) + docs, err := generator.GenerateDocs(documentSpec) if err != nil { log.Printf("document generation failed: %v", err) os.Exit(1)