diff --git a/generator/document.go b/generator/document.go index 71aa79d..de5ae1b 100644 --- a/generator/document.go +++ b/generator/document.go @@ -86,26 +86,24 @@ func GenerateDoc(spec DocumentSpec) (interface{}, error) { return nil, fmt.Errorf("failed to unmarshal document: %w", err) } - if spec.Destination == "rockset" { - if spec.Mode == "mixed" { - // Randomly choose a number to decide whether to generate a doc with an existing doc id - if rand.Intn(100) < spec.UpdatePercentage { - // Choose random id from one already existing doc id - doc["_id"] = formatDocId(rand.Intn(getMaxDoc())) - } else { - doc["_id"] = formatDocId(getMaxDoc()) - SetMaxDoc(getMaxDoc()+1) - } - doc_id = doc_id + 1 - // All other modes - } else if spec.IdMode == "uuid" { - doc["_id"] = guuid.New().String() - } else if spec.IdMode == "sequential"{ - doc["_id"] = formatDocId(doc_id) - doc_id = doc_id + 1 + if spec.Mode == "mixed" { + // Randomly choose a number to decide whether to generate a doc with an existing doc id + if rand.Intn(100) < spec.UpdatePercentage { + // Choose random id from one already existing doc id + doc["_id"] = formatDocId(rand.Intn(getMaxDoc())) } else { - panic(fmt.Sprintf("Unsupported generateDoc case: %s", spec.IdMode)) + doc["_id"] = formatDocId(getMaxDoc()) + SetMaxDoc(getMaxDoc()+1) } + doc_id = doc_id + 1 + // All other modes + } else if spec.IdMode == "uuid" { + doc["_id"] = guuid.New().String() + } else if spec.IdMode == "sequential" { + doc["_id"] = formatDocId(doc_id) + doc_id = doc_id + 1 + } else { + panic(fmt.Sprintf("Unsupported generateDoc case: %s", spec.IdMode)) } if spec.NumClusters > 0 { @@ -168,46 +166,71 @@ func RandomString(n int) string { return string(s) } -func GeneratePatches(num_patch int, c chan map[string]interface{}) ([]interface{}, error) { - ids_to_patch := genUniqueInRange(getMaxDoc(), num_patch) +func GeneratePatches(num_patch int, destination string, c chan map[string]interface{}) ([]interface{}, error) { patches := make([]interface{}, 0) + ids_to_patch := genUniqueInRange(getMaxDoc(), num_patch) for _, id := range ids_to_patch { - patch := generatePatch(id, <-c) - patches = append(patches, patch) - } + if (destination == "elastic") { + patch := generateElasticPatch(id, <-c) + patches = append(patches, patch) + } else if (destination == "rockset") { + patch := generateRocksetPatch(id, <-c) + patches = append(patches, patch) + } + } return patches, nil } -func RandomFieldAdd(c chan map[string]interface{}) { +func RandomFieldAdd(destination string, c chan map[string]interface{}) { // Adding fields or array members for { - options := []map[string]interface{}{{ - "op": "add", - "path": "/" + faker.UUIDDigit(), - "value": faker.Email(), - }, - { + if (destination == "rockset") { + options := []map[string]interface{}{{ + "op": "add", + "path": "/" + faker.UUIDDigit(), + "value": faker.Email(), + }, + { "op": "add", "path": "/Tags/-", "value": faker.UUIDHyphenated(), // Append to tags array - }, + }, + } + shuffleAndFillChannel(options, c) + } else if (destination == "elastic") { + options := []map[string]interface{}{{ + "doc": map[string]interface{}{ + faker.UUIDDigit(): faker.Email(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "script": map[string]interface{}{ + "source": "ctx._source.Tags.add(params.tag)", + "params": map[string]interface{}{ + "tag": faker.UUIDHyphenated(), + "_ts": CurrentTimeMicros(), + }, + }, + }, + } + shuffleAndFillChannel(options, c) } - shuffleAndFillChannel(options, c) } - } -func RandomFieldReplace(c chan map[string]interface{}) { +func RandomFieldReplace(destination string, c chan map[string]interface{}) { // Purely replacement of fields random := rand.New(rand.NewSource(time.Now().UnixNano())) for { - options := []map[string]interface{}{{ - "op": "replace", - "path": "/Email", - "value": faker.Email(), - }, + if (destination == "rockset") { + options := []map[string]interface{}{{ + "op": "replace", + "path": "/Email", + "value": faker.Email(), + }, { "op": "replace", "path": "/About", @@ -283,7 +306,124 @@ func RandomFieldReplace(c chan map[string]interface{}) { "path": "/Address/City", "value": faker.Word(), }} - shuffleAndFillChannel(options, c) + shuffleAndFillChannel(options, c) + } else if (destination == "elastic") { + options := []map[string]interface{}{{ + "doc": map[string]interface{}{ + "Email": faker.Email(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "About": faker.Sentence(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "Company": faker.Word() + "-" + faker.Word(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "script": map[string]interface{}{ + "source": "ctx._source.Name.First = params.updated_nested_first_name; ctx._source._ts = params.ts", + "params" : map[string]interface{}{ + "updated_nested_first_name" : faker.FirstName(), + "ts": CurrentTimeMicros(), + }, + }, + }, + { + "script": map[string]interface{}{ + "source": "ctx._source.Name.Last = params.updated_nested_last_name; ctx._source._ts = params.ts", + "params" : map[string]interface{}{ + "updated_nested_last_name": faker.LastName(), + "ts": CurrentTimeMicros(), + }, + }, + }, + { + "doc": map[string]interface{}{ + "Age": random.Intn(100), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "Balance": random.Float64(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "Registered": faker.Timestamp(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "Phone": faker.Phonenumber(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "Picture": faker.UUIDDigit(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "Guid": faker.UUIDHyphenated(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "doc": map[string]interface{}{ + "Greeting": faker.Paragraph(), + "_ts": CurrentTimeMicros(), + }, + }, + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.ZipCode = params.updated_nested_zcode; ctx._source._ts = params.ts", + "params" : map[string]interface{}{ + "updated_nested_zcode": random.Intn(100000), + "ts": CurrentTimeMicros(), + }, + }, + }, + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.Coordinates.Longitude = params.updated_nested_coord_long; ctx._source._ts = params.ts", + "params" : map[string]interface{}{ + "updated_nested_coord_long": faker.Longitude(), + "ts": CurrentTimeMicros(), + }, + }, + }, + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.Coordinates.Latitude = params.updated_nested_coord_lat; ctx._source._ts = params.ts", + "params" : map[string]interface{}{ + "updated_nested_coord_lat": faker.Latitude(), + "ts": CurrentTimeMicros(), + }, + }, + }, + { + "script": map[string]interface{}{ + "source": "ctx._source.Address.City = params.updated_nested_city; ctx._source._ts = params.ts", + "params" : map[string]interface{}{ + "updated_nested_city": faker.Word(), + "ts": CurrentTimeMicros(), + }, + }, + }} + shuffleAndFillChannel(options, c) + } } } @@ -307,7 +447,7 @@ func genUniqueInRange(limit int, count int) []int { return ids } -func generatePatch(id int, field_patch map[string]interface{}) map[string]interface{} { +func generateRocksetPatch(id int, field_patch map[string]interface{}) map[string]interface{} { patch := make(map[string]interface{}) patch["_id"] = formatDocId(id) add_op := []map[string]interface{}{field_patch, {"op": "add", "path": "/_ts", "value": CurrentTimeMicros()}} @@ -315,6 +455,13 @@ func generatePatch(id int, field_patch map[string]interface{}) map[string]interf return patch } +func generateElasticPatch(id int, field_patch map[string]interface{}) map[string]interface{} { + patch := make(map[string]interface{}) + patch["_id"] = formatDocId(id) + patch["patch"] = field_patch + return patch +} + func shuffleAndFillChannel(options []map[string]interface{}, c chan map[string]interface{}) { rand.Shuffle(len(options), func(i, j int) { options[i], options[j] = options[j], options[i] diff --git a/generator/elastic.go b/generator/elastic.go index 8f93ee5..b670e5c 100644 --- a/generator/elastic.go +++ b/generator/elastic.go @@ -9,8 +9,6 @@ import ( "net/http" "strings" "time" - - guuid "github.com/google/uuid" ) // Elastic contains all configurations needed to send documents to Elastic @@ -22,25 +20,87 @@ type Elastic struct { GeneratorIdentifier string } -func (j *Elastic) SendPatch(docs []interface{}) error { - //TODO implement me - panic("implement me") +func (e *Elastic) SendPatch(docs []interface{}) error { + numDocs := len(docs) + numEventIngested.Add(float64(numDocs)) + var builder bytes.Buffer + for i := 0; i < len(docs); i++ { + mdoc, errb := docs[i].(map[string]interface{}) + if !errb { + return fmt.Errorf("document is not a map of string to interface") + } + + index := make(map[string]interface{}) + index["_index"] = e.IndexName + index["_id"] = mdoc["_id"] + + line, err := json.Marshal(mdoc["patch"]) + if err != nil { + return fmt.Errorf("failed to marshal document: %w", err) + } + + ret := make(map[string]interface{}) + ret["update"] = index + metaLine, err := json.Marshal(ret) + if err != nil { + return fmt.Errorf("failed to marshal request: %w", err) + } + + builder.Write(metaLine) + builder.WriteByte('\n') + builder.Write(line) + builder.WriteByte('\n') + } + + body := builder.Bytes() + bulkURL := e.URL + "/_bulk" + elasticHTTPRequest, _ := http.NewRequest(http.MethodPost, bulkURL, bytes.NewBuffer(body)) + elasticHTTPRequest.Header.Add("Authorization", e.Auth) + elasticHTTPRequest.Header.Add("Content-Type", "application/x-ndjson") + + resp, err := e.Client.Do(elasticHTTPRequest) + if err != nil { + recordPatchesErrored(float64(numDocs)) + return fmt.Errorf("failed to send request: %w", err) + } + defer deferredErrorCloser(resp.Body) + + if resp.StatusCode != http.StatusOK { + recordPatchesErrored(float64(numDocs)) + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + return fmt.Errorf("error code: %d, body: %s", resp.StatusCode, string(bodyBytes)) + } + // fmt.Println("doc sent!") + recordPatchesCompleted(float64(numDocs)) + return nil + } -// SendDocument sends a batch of documents to Rockset +// SendDocument sends a batch of documents to Elastic func (e *Elastic) SendDocument(docs []any) error { numDocs := len(docs) numEventIngested.Add(float64(numDocs)) var builder bytes.Buffer for i := 0; i < len(docs); i++ { - line, err := json.Marshal(docs[i]) - if err != nil { - return fmt.Errorf("failed to marshal document: %w", err) + mdoc, errb := docs[i].(map[string]interface{}) + if !errb { + return fmt.Errorf("document is not a map of string to interface") } index := make(map[string]interface{}) index["_index"] = e.IndexName - index["_id"] = guuid.New().String() + index["_id"] = mdoc["_id"] + // "_id" is not allowed in the doc + delete(mdoc, "_id") + + line, err := json.Marshal(mdoc) + if err != nil { + return fmt.Errorf("failed to marshal document: %w", err) + } + ret := make(map[string]interface{}) ret["index"] = index metaLine, err := json.Marshal(ret) diff --git a/main.go b/main.go index d88c9f1..fe071aa 100644 --- a/main.go +++ b/main.go @@ -251,15 +251,15 @@ func main() { // must explicitly set number of docs so updates are applied evenly across document keys generator.SetMaxDoc(numDocs) } - if destination != "rockset" { - panic("Patches can only be generated for Rockset at this time") + if destination != "rockset" && destination != "elastic" { + panic("Patches can only be generated for Rockset or elastic at this time") } patchChannel := make(chan map[string]interface{}, 1) log.Printf("Sending patches in '%s' mode", patchMode) if patchMode == "replace" { - go generator.RandomFieldReplace(patchChannel) + go generator.RandomFieldReplace(destination, patchChannel) } else { - go generator.RandomFieldAdd(patchChannel) + go generator.RandomFieldAdd(destination, patchChannel) } for { select { @@ -269,7 +269,7 @@ func main() { os.Exit(0) case <-t.C: for i := 0; i < pps; i++ { - docs, err := generator.GeneratePatches(batchSize, patchChannel) + docs, err := generator.GeneratePatches(batchSize, destination, patchChannel) if err != nil { log.Printf("patch generation failed: %v", err) os.Exit(1) diff --git a/rockbench b/rockbench new file mode 100755 index 0000000..0a9fb73 Binary files /dev/null and b/rockbench differ