Skip to content

Commit

Permalink
[DC-1780] Add clustering and mixed insert/update mode (#27)
Browse files Browse the repository at this point in the history
This PR adds the following functionality:
- Specify number of distinct cluster values to test clustered collections via `NUM_CLUSTERS`
- Specify percentage of documents hitting a specific cluster to simulate hot clustering via `HOT_CLUSTER_PERCENTAGE`
- Add a mode that mixes inserts and updates randomly via `mixed` mode, `UPDATE_PERCENTAGE`, and `MAX_DOCS`

Mixed mode only works well if there's one kubernetes replica doing the work. The max doc id, n, is tracked locally so that any docs with doc id 0 to n will be updates and n+1 will be an insert. The alternative that would work for multiple replicas is to use a uuid on inserts and for updates choose from 0 to n. This means the collection would need to be populated with docs from 0 to n first, and any newly inserted docs with uuids would never be updated.
  • Loading branch information
richardlin047 authored Jun 21, 2023
1 parent 3cf72f2 commit 51fc3f0
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 18 deletions.
62 changes: 50 additions & 12 deletions generator/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@ import (
guuid "github.com/google/uuid"
)

type DocumentSpec struct {
Destination string
GeneratorIdentifier string
BatchSize int
Mode string
IdMode string
UpdatePercentage int
NumClusters int
HotClusterPercentage int
}

type DocStruct struct {
Guid string
IsActive bool
Expand Down Expand Up @@ -59,8 +70,9 @@ type FriendDetailsStruct struct {
}

var doc_id = 0
var max_doc_id = 0

func GenerateDoc(destination, identifier string, idMode string) (interface{}, error) {
func GenerateDoc(spec DocumentSpec) (interface{}, error) {
docStruct := DocStruct{}
err := faker.FakeData(&docStruct)
if err != nil {
Expand All @@ -74,43 +86,69 @@ func GenerateDoc(destination, identifier string, idMode string) (interface{}, er
return nil, fmt.Errorf("failed to unmarshal document: %w", err)
}

if destination == "rockset" {
if idMode == "uuid" {
if spec.Destination == "rockset" {
if spec.Mode == "mixed" {
// Randomly choose a number to decide whether to generate a doc with an existing doc id
if rand.Intn(100) < spec.UpdatePercentage {
// Choose random id from one already existing doc id
doc["_id"] = formatDocId(rand.Intn(getMaxDoc()))
} else {
doc["_id"] = formatDocId(getMaxDoc())
SetMaxDoc(getMaxDoc()+1)
}
doc_id = doc_id + 1
// All other modes
} else if spec.IdMode == "uuid" {
doc["_id"] = guuid.New().String()
} else {
} else if spec.IdMode == "sequential"{
doc["_id"] = formatDocId(doc_id)
doc_id = doc_id + 1
} else {
panic(fmt.Sprintf("Unsupported generateDoc case: %s", spec.IdMode))
}
}

if spec.NumClusters > 0 {
doc["cluster1"] = getClusterKey(spec.NumClusters, spec.HotClusterPercentage)
}

doc["_event_time"] = CurrentTimeMicros()
// Set _ts as _event_time is not mutable
doc["_ts"] = CurrentTimeMicros()
doc["generator_identifier"] = identifier
doc["generator_identifier"] = spec.GeneratorIdentifier

return doc, nil
}

func getClusterKey(numClusters int, hotClusterPercentage int) string {
if hotClusterPercentage > 0 && rand.Intn(100) < hotClusterPercentage {
return "[email protected]"
} else {
return fmt.Sprintf("%[email protected]", rand.Intn(numClusters))
}
}

func getMaxDoc() int {
// doc_ids are left padded monotonic integers,
//this returns the highest exclusive doc id for purposes of issuing patches.
return doc_id
return max_doc_id
}

func SetMaxDoc(maxDocId int) {
doc_id = maxDocId
// doc_id = maxDocId
max_doc_id = maxDocId
}

func CurrentTimeMicros() int64 {
t := time.Now()
return int64(time.Nanosecond) * t.UnixNano() / int64(time.Microsecond)
}

func GenerateDocs(batchSize int, destination, identifier string, idMode string) ([]interface{}, error) {
var docs = make([]interface{}, batchSize, batchSize)
func GenerateDocs(spec DocumentSpec) ([]interface{}, error) {
var docs = make([]interface{}, spec.BatchSize, spec.BatchSize)

for i := 0; i < batchSize; i++ {
doc, err := GenerateDoc(destination, identifier, idMode)
for i := 0; i < spec.BatchSize; i++ {
doc, err := GenerateDoc(spec)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -262,7 +300,7 @@ func genUniqueInRange(limit int, count int) []int {

ids := make([]int, count)
i := 0
for k, _ := range ids_to_patch {
for k := range ids_to_patch {
ids[i] = k
i++
}
Expand Down
12 changes: 11 additions & 1 deletion generator/elastic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,18 @@ func TestElastic_GetLatestTimestamp(t *testing.T) {

func TestElastic_SendDocument(t *testing.T) {
r := NewElasticClient("")
spec := DocumentSpec{
Destination: "elastic",
GeneratorIdentifier: r.GeneratorIdentifier,
BatchSize: 10,
Mode: "add",
IdMode: "sequential",
UpdatePercentage: -1,
NumClusters: -1,
HotClusterPercentage: -1,
};

docs, err := GenerateDocs(10, "Elastic", r.GeneratorIdentifier, "sequential")
docs, err := GenerateDocs(spec)
assert.Nil(t, err)
err = r.SendDocument(docs)
assert.Nil(t, err)
Expand Down
12 changes: 11 additions & 1 deletion generator/rockset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,18 @@ func TestRockset_GetLatestTimestamp(t *testing.T) {

func TestRockset_SendDocument(t *testing.T) {
r := NewRocksetClient("")
spec := DocumentSpec{
Destination: "rockset",
GeneratorIdentifier: r.GeneratorIdentifier,
BatchSize: 10,
Mode: "add",
IdMode: "uuid",
UpdatePercentage: -1,
NumClusters: -1,
HotClusterPercentage: -1,
};

docs, err := GenerateDocs(10, "Rockset", r.GeneratorIdentifier, "uuid")
docs, err := GenerateDocs(spec)
assert.Nil(t, err)
err = r.SendDocument(docs)
assert.Nil(t, err)
Expand Down
50 changes: 46 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func main() {
batchSize := mustGetEnvInt("BATCH_SIZE")
destination := strings.ToLower(mustGetEnvString("DESTINATION"))
numDocs := getEnvDefaultInt("NUM_DOCS", -1)
maxDocs := getEnvDefaultInt("MAX_DOCS", -1) // Used to track the known max doc id for upserts to update existing collections
mode := getEnvDefault("MODE", "add")
idMode := getEnvDefault("ID_MODE", "uuid")
patchMode := getEnvDefault("PATCH_MODE", "replace")
Expand All @@ -35,11 +36,18 @@ func main() {
replicas := getEnvDefaultInt("REPLICAS", 1)
promPort := getEnvDefaultInt("PROM_PORT", 9161)

// Mixed mode related settings
updatePercentage := getEnvDefaultInt("UPDATE_PERCENTAGE", -1) // Percentage of documents that update existing documents

// Clustering related settings
numClusters := getEnvDefaultInt("NUM_CLUSTERS", -1) // Number of distinct values for the cluster key
hotClusterPercentage := getEnvDefaultInt("HOT_CLUSTER_PERCENTAGE", -1) // Percentage of inserts/updates that go to single cluster key. Remaining percentage is uniformly distributed

if !(patchMode == "replace" || patchMode == "add") {
panic("Invalid patch mode specified, expecting either 'replace' or 'add'")
}
if !(mode == "add" || mode == "patch" || mode == "add_then_patch") {
panic("Invalid mode specified, expecting one of 'add', 'patch', 'add_then_patch'")
if !(mode == "add" || mode == "patch" || mode == "add_then_patch" || mode == "mixed") {
panic("Invalid mode specified, expecting one of 'add', 'patch', 'add_then_patch', 'mixed'")
}
if !(idMode == "uuid" || idMode == "sequential") {
panic("Invalid idMode specified, expecting 'uuid' or 'sequential'")
Expand All @@ -53,6 +61,26 @@ func main() {
panic("Patch mode requires a positive number of docs to perform patches against. Please specify a number of documents via NUM_DOCS env var.")
}

if mode == "mixed" {
if idMode != "sequential" {
panic("`mixed` MODE supports ID_MODE `sequential` only")
}
if updatePercentage < 0 || updatePercentage > 100 {
panic("`mixed` MODE requires a positive number between 0 and 100. Please specify the percentage of documents to be updates via UPDATE_PERCENTAGE env var")
}
if maxDocs <= 0 {
panic("`mixed` MODE requires a positive number for MAX_DOCS. This tracks the maximum doc id in the collection and can be used to continue adding document ids sequentially. If no documents exist, specify 1")
}
}

if hotClusterPercentage > 0 && numClusters < 0 {
panic("NUM_CLUSTERS must be specified if HOT_CLUSTER_PERCENTAGE is provided.")
}

if hotClusterPercentage == 0 || hotClusterPercentage > 100 || numClusters == 0 {
panic("NUM_CLUSTERS must be a positive number and HOT_CLUSTER_PERCENTAGE must be greater than 0 and less than or equal to 100 if specified.")
}

pps := getEnvDefaultInt("PPS", wps)
defaultRoundTripper := http.DefaultTransport
defaultTransportPointer, ok := defaultRoundTripper.(*http.Transport)
Expand All @@ -67,6 +95,17 @@ func main() {
generatorIdentifier := generator.RandomString(10)
fmt.Println("Generator identifier: ", generatorIdentifier)

documentSpec := generator.DocumentSpec{
Destination: destination,
GeneratorIdentifier: generatorIdentifier,
BatchSize: batchSize,
Mode: mode,
IdMode: idMode,
UpdatePercentage: updatePercentage,
NumClusters: numClusters,
HotClusterPercentage: hotClusterPercentage,
}

var d generator.Destination

switch destination {
Expand Down Expand Up @@ -177,7 +216,10 @@ func main() {
docs_written := 0
t := time.NewTicker(time.Second)
defer t.Stop()
if mode == "add_then_patch" || mode == "add" {
if mode == "add_then_patch" || mode == "add" || mode == "mixed" {
if mode == "mixed" {
generator.SetMaxDoc(maxDocs)
}
for numDocs < 0 || docs_written < numDocs {
select {
// when doneChan is closed, receive immediately returns the zero value
Expand All @@ -187,7 +229,7 @@ func main() {
case <-t.C:
for i := 0; i < wps; i++ {
// TODO: move doc generation out of this loop into a go routine that pre-generates them
docs, err := generator.GenerateDocs(batchSize, destination, generatorIdentifier, idMode)
docs, err := generator.GenerateDocs(documentSpec)
if err != nil {
log.Printf("document generation failed: %v", err)
os.Exit(1)
Expand Down

0 comments on commit 51fc3f0

Please sign in to comment.