Skip to content

Commit

Permalink
patch
Browse files Browse the repository at this point in the history
  • Loading branch information
kwadhwa18 committed Jul 19, 2023
1 parent 51fc3f0 commit 7862897
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 56 deletions.
229 changes: 188 additions & 41 deletions generator/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,24 @@ func GenerateDoc(spec DocumentSpec) (interface{}, error) {
return nil, fmt.Errorf("failed to unmarshal document: %w", err)
}

if spec.Destination == "rockset" {
if spec.Mode == "mixed" {
// Randomly choose a number to decide whether to generate a doc with an existing doc id
if rand.Intn(100) < spec.UpdatePercentage {
// Choose random id from one already existing doc id
doc["_id"] = formatDocId(rand.Intn(getMaxDoc()))
} else {
doc["_id"] = formatDocId(getMaxDoc())
SetMaxDoc(getMaxDoc()+1)
}
doc_id = doc_id + 1
// All other modes
} else if spec.IdMode == "uuid" {
doc["_id"] = guuid.New().String()
} else if spec.IdMode == "sequential"{
doc["_id"] = formatDocId(doc_id)
doc_id = doc_id + 1
if spec.Mode == "mixed" {
// Randomly choose a number to decide whether to generate a doc with an existing doc id
if rand.Intn(100) < spec.UpdatePercentage {
// Choose random id from one already existing doc id
doc["_id"] = formatDocId(rand.Intn(getMaxDoc()))
} else {
panic(fmt.Sprintf("Unsupported generateDoc case: %s", spec.IdMode))
doc["_id"] = formatDocId(getMaxDoc())
SetMaxDoc(getMaxDoc()+1)
}
doc_id = doc_id + 1
// All other modes
} else if spec.IdMode == "uuid" {
doc["_id"] = guuid.New().String()
} else if spec.IdMode == "sequential" {
doc["_id"] = formatDocId(doc_id)
doc_id = doc_id + 1
} else {
panic(fmt.Sprintf("Unsupported generateDoc case: %s", spec.IdMode))
}

if spec.NumClusters > 0 {
Expand Down Expand Up @@ -168,46 +166,71 @@ func RandomString(n int) string {
return string(s)
}

func GeneratePatches(num_patch int, c chan map[string]interface{}) ([]interface{}, error) {
ids_to_patch := genUniqueInRange(getMaxDoc(), num_patch)
func GeneratePatches(num_patch int, destination string, c chan map[string]interface{}) ([]interface{}, error) {
patches := make([]interface{}, 0)

ids_to_patch := genUniqueInRange(getMaxDoc(), num_patch)
for _, id := range ids_to_patch {
patch := generatePatch(id, <-c)
patches = append(patches, patch)
}
if (destination == "elastic") {
patch := generateElasticPatch(id, <-c)
patches = append(patches, patch)

} else if (destination == "rockset") {
patch := generateRocksetPatch(id, <-c)
patches = append(patches, patch)
}
}
return patches, nil
}

func RandomFieldAdd(c chan map[string]interface{}) {
func RandomFieldAdd(destination string, c chan map[string]interface{}) {
// Adding fields or array members
for {
options := []map[string]interface{}{{
"op": "add",
"path": "/" + faker.UUIDDigit(),
"value": faker.Email(),
},
{
if (destination == "rockset") {
options := []map[string]interface{}{{
"op": "add",
"path": "/" + faker.UUIDDigit(),
"value": faker.Email(),
},
{
"op": "add",
"path": "/Tags/-",
"value": faker.UUIDHyphenated(), // Append to tags array
},
},
}
shuffleAndFillChannel(options, c)
} else if (destination == "elastic") {
options := []map[string]interface{}{{
"doc": map[string]interface{}{
faker.UUIDDigit(): faker.Email(),
"_ts": CurrentTimeMicros(),
},
},
{
"script": map[string]interface{}{
"source": "ctx._source.Tags.add(params.tag)",
"params": map[string]interface{}{
"tag": faker.UUIDHyphenated(),
"_ts": CurrentTimeMicros(),
},
},
},
}
shuffleAndFillChannel(options, c)
}
shuffleAndFillChannel(options, c)
}

}

func RandomFieldReplace(c chan map[string]interface{}) {
func RandomFieldReplace(destination string, c chan map[string]interface{}) {
// Purely replacement of fields
random := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
options := []map[string]interface{}{{
"op": "replace",
"path": "/Email",
"value": faker.Email(),
},
if (destination == "rockset") {
options := []map[string]interface{}{{
"op": "replace",
"path": "/Email",
"value": faker.Email(),
},
{
"op": "replace",
"path": "/About",
Expand Down Expand Up @@ -283,7 +306,124 @@ func RandomFieldReplace(c chan map[string]interface{}) {
"path": "/Address/City",
"value": faker.Word(),
}}
shuffleAndFillChannel(options, c)
shuffleAndFillChannel(options, c)
} else if (destination == "elastic") {
options := []map[string]interface{}{{
"doc": map[string]interface{}{
"Email": faker.Email(),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"About": faker.Sentence(),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"Company": faker.Word() + "-" + faker.Word(),
"_ts": CurrentTimeMicros(),
},
},
{
"script": map[string]interface{}{
"source": "ctx._source.Name.First = params.updated_nested_first_name; ctx._source._ts = params.ts",
"params" : map[string]interface{}{
"updated_nested_first_name" : faker.FirstName(),
"ts": CurrentTimeMicros(),
},
},
},
{
"script": map[string]interface{}{
"source": "ctx._source.Name.Last = params.updated_nested_last_name; ctx._source._ts = params.ts",
"params" : map[string]interface{}{
"updated_nested_last_name": faker.LastName(),
"ts": CurrentTimeMicros(),
},
},
},
{
"doc": map[string]interface{}{
"Age": random.Intn(100),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"Balance": random.Float64(),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"Registered": faker.Timestamp(),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"Phone": faker.Phonenumber(),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"Picture": faker.UUIDDigit(),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"Guid": faker.UUIDHyphenated(),
"_ts": CurrentTimeMicros(),
},
},
{
"doc": map[string]interface{}{
"Greeting": faker.Paragraph(),
"_ts": CurrentTimeMicros(),
},
},
{
"script": map[string]interface{}{
"source": "ctx._source.Address.ZipCode = params.updated_nested_zcode; ctx._source._ts = params.ts",
"params" : map[string]interface{}{
"updated_nested_zcode": random.Intn(100000),
"ts": CurrentTimeMicros(),
},
},
},
{
"script": map[string]interface{}{
"source": "ctx._source.Address.Coordinates.Longitude = params.updated_nested_coord_long; ctx._source._ts = params.ts",
"params" : map[string]interface{}{
"updated_nested_coord_long": faker.Longitude(),
"ts": CurrentTimeMicros(),
},
},
},
{
"script": map[string]interface{}{
"source": "ctx._source.Address.Coordinates.Latitude = params.updated_nested_coord_lat; ctx._source._ts = params.ts",
"params" : map[string]interface{}{
"updated_nested_coord_lat": faker.Latitude(),
"ts": CurrentTimeMicros(),
},
},
},
{
"script": map[string]interface{}{
"source": "ctx._source.Address.City = params.updated_nested_city; ctx._source._ts = params.ts",
"params" : map[string]interface{}{
"updated_nested_city": faker.Word(),
"ts": CurrentTimeMicros(),
},
},
}}
shuffleAndFillChannel(options, c)
}
}
}

Expand All @@ -307,14 +447,21 @@ func genUniqueInRange(limit int, count int) []int {
return ids
}

func generatePatch(id int, field_patch map[string]interface{}) map[string]interface{} {
func generateRocksetPatch(id int, field_patch map[string]interface{}) map[string]interface{} {
patch := make(map[string]interface{})
patch["_id"] = formatDocId(id)
add_op := []map[string]interface{}{field_patch, {"op": "add", "path": "/_ts", "value": CurrentTimeMicros()}}
patch["patch"] = add_op
return patch
}

func generateElasticPatch(id int, field_patch map[string]interface{}) map[string]interface{} {
patch := make(map[string]interface{})
patch["_id"] = formatDocId(id)
patch["patch"] = field_patch
return patch
}

func shuffleAndFillChannel(options []map[string]interface{}, c chan map[string]interface{}) {
rand.Shuffle(len(options), func(i, j int) {
options[i], options[j] = options[j], options[i]
Expand Down
80 changes: 70 additions & 10 deletions generator/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"net/http"
"strings"
"time"

guuid "github.com/google/uuid"
)

// Elastic contains all configurations needed to send documents to Elastic
Expand All @@ -22,25 +20,87 @@ type Elastic struct {
GeneratorIdentifier string
}

func (j *Elastic) SendPatch(docs []interface{}) error {
//TODO implement me
panic("implement me")
func (e *Elastic) SendPatch(docs []interface{}) error {
numDocs := len(docs)
numEventIngested.Add(float64(numDocs))
var builder bytes.Buffer
for i := 0; i < len(docs); i++ {
mdoc, errb := docs[i].(map[string]interface{})
if !errb {
return fmt.Errorf("document is not a map of string to interface")
}

index := make(map[string]interface{})
index["_index"] = e.IndexName
index["_id"] = mdoc["_id"]

line, err := json.Marshal(mdoc["patch"])
if err != nil {
return fmt.Errorf("failed to marshal document: %w", err)
}

ret := make(map[string]interface{})
ret["update"] = index
metaLine, err := json.Marshal(ret)
if err != nil {
return fmt.Errorf("failed to marshal request: %w", err)
}

builder.Write(metaLine)
builder.WriteByte('\n')
builder.Write(line)
builder.WriteByte('\n')
}

body := builder.Bytes()
bulkURL := e.URL + "/_bulk"
elasticHTTPRequest, _ := http.NewRequest(http.MethodPost, bulkURL, bytes.NewBuffer(body))
elasticHTTPRequest.Header.Add("Authorization", e.Auth)
elasticHTTPRequest.Header.Add("Content-Type", "application/x-ndjson")

resp, err := e.Client.Do(elasticHTTPRequest)
if err != nil {
recordPatchesErrored(float64(numDocs))
return fmt.Errorf("failed to send request: %w", err)
}
defer deferredErrorCloser(resp.Body)

if resp.StatusCode != http.StatusOK {
recordPatchesErrored(float64(numDocs))
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
return fmt.Errorf("error code: %d, body: %s", resp.StatusCode, string(bodyBytes))
}
// fmt.Println("doc sent!")
recordPatchesCompleted(float64(numDocs))
return nil

}

// SendDocument sends a batch of documents to Rockset
// SendDocument sends a batch of documents to Elastic
func (e *Elastic) SendDocument(docs []any) error {
numDocs := len(docs)
numEventIngested.Add(float64(numDocs))
var builder bytes.Buffer
for i := 0; i < len(docs); i++ {
line, err := json.Marshal(docs[i])
if err != nil {
return fmt.Errorf("failed to marshal document: %w", err)
mdoc, errb := docs[i].(map[string]interface{})
if !errb {
return fmt.Errorf("document is not a map of string to interface")
}

index := make(map[string]interface{})
index["_index"] = e.IndexName
index["_id"] = guuid.New().String()
index["_id"] = mdoc["_id"]
// "_id" is not allowed in the doc
delete(mdoc, "_id")

line, err := json.Marshal(mdoc)
if err != nil {
return fmt.Errorf("failed to marshal document: %w", err)
}

ret := make(map[string]interface{})
ret["index"] = index
metaLine, err := json.Marshal(ret)
Expand Down
Loading

0 comments on commit 7862897

Please sign in to comment.