diff --git a/.gitignore b/.gitignore index b16c7bc..e0f081d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,6 @@ workers/java/bin workers/java/build /loadgen/kitchen-sink-gen/target/ + +# Ignore temporary files +**/omes-temp*/ \ No newline at end of file diff --git a/README.md b/README.md index 000664e..f2bf1b0 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,23 @@ This will produce an image tagged like `-go-v1.24.0`. Publishing images is typically done via CI, using the `push-images` command. See the GHA workflows for more. +### Debugging workers + +First, generate the worker code: + +```shell +go run ./cmd prepare-worker --dir-name omes-temp-worker --language go +``` + +Then, run it: + +```shell +cd workers/go/omes-temp-worker +SCENARIO=completion_callbacks +RUN_ID=local-test-run +dlv debug -- --task-queue "${SCENARIO}:${RUN_ID}" +``` + ## Design decisions ### Kitchen Sink Workflow diff --git a/cmd/cleanup_scenario.go b/cmd/cleanup_scenario.go index f3358aa..87119fb 100644 --- a/cmd/cleanup_scenario.go +++ b/cmd/cleanup_scenario.go @@ -7,6 +7,7 @@ import ( "os/user" "time" + "github.com/pborman/uuid" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/temporalio/omes/cmd/cmdoptions" @@ -68,7 +69,7 @@ func (c *scenarioCleaner) run(ctx context.Context) error { client := c.clientOptions.MustDial(metrics, c.logger) defer client.Close() taskQueue := loadgen.TaskQueueForRun(c.scenario, c.runID) - jobID := "omes-cleanup-" + taskQueue + jobID := "omes-cleanup-" + taskQueue + "-" + uuid.New() username, hostname := "anonymous", "unknown" if user, err := user.Current(); err == nil { username = user.Name diff --git a/cmd/prepare_worker.go b/cmd/prepare_worker.go index 51d0818..dd15582 100644 --- a/cmd/prepare_worker.go +++ b/cmd/prepare_worker.go @@ -22,7 +22,13 @@ func prepareWorkerCmd() *cobra.Command { Short: "Build worker ready to run", Run: func(cmd *cobra.Command, args []string) { if _, err := b.build(cmd.Context()); err != nil { - b.logger.Fatal(err) + logger := b.logger + if logger != nil { + logger.Fatal(err) + } else { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } } }, } @@ -94,7 +100,9 @@ func (b *workerBuilder) buildGo(ctx context.Context, baseDir string) (sdkbuild.P Version: b.version, GoModContents: `module github.com/temporalio/omes-worker -go 1.20 +go 1.21 + +toolchain go1.21.5 require github.com/temporalio/omes v1.0.0 require github.com/temporalio/omes/workers/go v1.0.0 diff --git a/common/workflows.go b/common/workflows.go new file mode 100644 index 0000000..1bb0f6c --- /dev/null +++ b/common/workflows.go @@ -0,0 +1,6 @@ +package common + +const ( + WorkflowNameKitchenSink = "kitchenSink" + WorkflowNameThroughputStress = "throughputStress" +) diff --git a/go.mod b/go.mod index 9bf8222..32b94b0 100644 --- a/go.mod +++ b/go.mod @@ -1,19 +1,24 @@ module github.com/temporalio/omes -go 1.20 +go 1.21 + +toolchain go1.21.5 require ( + github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.3 + github.com/pborman/uuid v1.2.1 github.com/prometheus/client_golang v1.16.0 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 - github.com/temporalio/features v0.0.0-20231218231852-27c681667dae - go.temporal.io/api v1.26.1 - go.temporal.io/sdk v1.25.2-0.20231129171107-288a04f72145 - go.uber.org/zap v1.25.0 - golang.org/x/mod v0.12.0 + github.com/temporalio/features v0.0.0-20240110061640-5b9f4be6bb76 + go.temporal.io/api v1.26.1-0.20240106224952-e65d246174b6 + go.temporal.io/sdk v1.25.2-0.20240110060334-d09a3f3bfe09 + go.uber.org/multierr v1.11.0 + go.uber.org/zap v1.26.0 + golang.org/x/mod v0.14.0 golang.org/x/sync v0.5.0 golang.org/x/sys v0.15.0 google.golang.org/protobuf v1.31.0 @@ -30,33 +35,31 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect github.com/golang/mock v1.6.0 // indirect github.com/google/uuid v1.3.1 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/otiai10/copy v1.14.0 // indirect - github.com/pborman/uuid v1.2.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.1 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/goleak v1.2.1 // indirect - go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect - google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/grpc v1.59.0 // indirect + google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 // indirect + google.golang.org/grpc v1.60.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) // This is dumb, but necesary because Go (for some commands) can't figure out the transitive // local-replace inside of the features module itself, so we have to help it. replace ( - github.com/temporalio/features/features => github.com/temporalio/features/features v0.0.0-20231218231852-27c681667dae - github.com/temporalio/features/harness/go => github.com/temporalio/features/harness/go v0.0.0-20231218231852-27c681667dae + github.com/temporalio/features/features => github.com/temporalio/features/features v0.0.0-20240110061640-5b9f4be6bb76 + github.com/temporalio/features/harness/go => github.com/temporalio/features/harness/go v0.0.0-20240110061640-5b9f4be6bb76 ) diff --git a/go.sum b/go.sum index 107fd44..c9db48a 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -37,6 +36,7 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -51,15 +51,18 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= +github.com/otiai10/mint v1.5.1/go.mod h1:MJm72SBthJjz8qhefc4z1PYEieWmy8Bku7CjcAqyUSM= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -77,6 +80,7 @@ github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/ github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= @@ -98,15 +102,15 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/temporalio/features v0.0.0-20231218231852-27c681667dae h1:d5LK3X10VZEWpLhZ5dIPcirvELKVtT4rEV+8wzfgBRM= -github.com/temporalio/features v0.0.0-20231218231852-27c681667dae/go.mod h1:Jm0Yq8DKEkSzcQ1YbZ5yeqrD6iyyWzQMcsXF0G1ylM4= +github.com/temporalio/features v0.0.0-20240110061640-5b9f4be6bb76 h1:z840SAHHUvN7UZfAFGVIeUDhdIba+povmMJ95X39g0U= +github.com/temporalio/features v0.0.0-20240110061640-5b9f4be6bb76/go.mod h1:0P+BrDzZGfYK0ibonxtmsjJBjKsudAhhKVPpn0jwx6Y= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k= -go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= -go.temporal.io/sdk v1.25.2-0.20231129171107-288a04f72145 h1:aV7tRpzB3tr9LGs4/SN7MSWSbVx+bgDYfOoGMjk4oEM= -go.temporal.io/sdk v1.25.2-0.20231129171107-288a04f72145/go.mod h1:MHw8PEOVmOJC1yduTVxYq1GsM5kkQg0sIwRST7cRHoo= +go.temporal.io/api v1.26.1-0.20240106224952-e65d246174b6 h1:HMgz/R0Zy60KD6/QS2zyWBFgUCSusXMimXUl9fIWcg4= +go.temporal.io/api v1.26.1-0.20240106224952-e65d246174b6/go.mod h1:d5Zx/HUcbL3/wmH89RNnjeV2BuOHeofAkilC4F2MuKA= +go.temporal.io/sdk v1.25.2-0.20240110060334-d09a3f3bfe09 h1:8ZFyOs0mVb4Sn+Oxqq1YlgMcIFijL/KBDKoqtG0u++Q= +go.temporal.io/sdk v1.25.2-0.20240110060334-d09a3f3bfe09/go.mod h1:Gf7Zou4O35s8/F3rmKsu0C5hXD+BYBJHHz273ji7Eiw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -117,12 +121,14 @@ go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9i go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= -go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= -go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -130,8 +136,8 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= -golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -190,19 +196,19 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 h1:W12Pwm4urIbRdGhMEg2NM9O3TWKjNcxQhs46V0ypf/k= -google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4/go.mod h1:5RBcpGRxr25RbDzY5w+dmaqpSEvl8Gwl1x2CICf60ic= -google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 h1:ZcOkrmX74HbKFYnpPY8Qsw93fC29TbJXspYKaBkSXDQ= -google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4/go.mod h1:k2dtGpRrbsSyKcNPKKI5sstZkrNCZwpU/ns96JoHbGg= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= +google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos= +google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY= +google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 h1:s1w3X6gQxwrLEpxnLd/qXTVLgQE2yXwaOaoa6IlY/+o= +google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0/go.mod h1:CAny0tYF+0/9rmDB9fahA9YLzX3+AEVl1qXbv5hhj6c= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= +google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= @@ -210,6 +216,7 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/loadgen/scenario.go b/loadgen/scenario.go index c918f44..0c19f4b 100644 --- a/loadgen/scenario.go +++ b/loadgen/scenario.go @@ -3,14 +3,15 @@ package loadgen import ( "context" "fmt" - "go.temporal.io/api/enums/v1" - "go.temporal.io/api/operatorservice/v1" "path/filepath" "runtime" "strconv" "strings" "time" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/operatorservice/v1" + "github.com/temporalio/omes/loadgen/kitchensink" "go.temporal.io/sdk/client" "go.uber.org/zap" @@ -96,18 +97,61 @@ type ScenarioInfo struct { } func (s *ScenarioInfo) ScenarioOptionInt(name string, defaultValue int) int { - v := s.ScenarioOptions[name] - if v == "" { + return ScenarioOptionInt(s.ScenarioOptions, name, defaultValue) +} + +func ScenarioOptionInt(options map[string]string, name string, defaultValue int) int { + return ScenarioOption(options, name, defaultValue, strconv.Atoi) +} + +func (s *ScenarioInfo) ScenarioOptionFloat64(name string, defaultValue float64) float64 { + return ScenarioOptionFloat64(s.ScenarioOptions, name, defaultValue) +} + +func ScenarioOptionFloat64(options map[string]string, name string, defaultValue float64) float64 { + return ScenarioOption(options, name, defaultValue, func(s string) (float64, error) { + return strconv.ParseFloat(s, 64) + }) +} + +func (s *ScenarioInfo) ScenarioOptionDuration(name string, defaultValue time.Duration) time.Duration { + return ScenarioOptionDuration(s.ScenarioOptions, name, defaultValue) +} + +func ScenarioOptionDuration(options map[string]string, name string, defaultValue time.Duration) time.Duration { + return ScenarioOption(options, name, defaultValue, time.ParseDuration) +} + +func (r *Run) ScenarioOptionBool(name string, defaultValue bool) bool { + return ScenarioOptionBool(r.ScenarioOptions, name, defaultValue) +} + +func ScenarioOptionBool(options map[string]string, name string, defaultValue bool) bool { + s := options[name] + if s == "" { + return defaultValue + } + v, err := strconv.ParseBool(s) + if err != nil { + panic(err) + } + return v +} + +func ScenarioOption[T any](options map[string]string, name string, defaultValue T, f func(string) (T, error)) T { + s := options[name] + if s == "" { return defaultValue } - i, err := strconv.Atoi(v) + v, err := f(s) if err != nil { panic(err) } - return i + return v } const DefaultIterations = 10 + const DefaultMaxConcurrent = 10 type RunConfiguration struct { diff --git a/scenarios/completion_callbacks.go b/scenarios/completion_callbacks.go new file mode 100644 index 0000000..32d4a04 --- /dev/null +++ b/scenarios/completion_callbacks.go @@ -0,0 +1,439 @@ +package scenarios + +import ( + "context" + "fmt" + "math" + "math/rand" + "net/url" + "strings" + "sync/atomic" + "time" + + "github.com/facebookgo/clock" + "github.com/pborman/uuid" + "github.com/temporalio/omes/common" + "github.com/temporalio/omes/loadgen" + "github.com/temporalio/omes/loadgen/kitchensink" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/client" + "go.uber.org/multierr" + "go.uber.org/zap" + "golang.org/x/time/rate" +) + +// This file contains code for a scenario to test workflow completion callbacks. You can test it by running: +// go run ./cmd run-scenario-with-worker --language go --scenario completion_callbacks --option hostName=localhost,startingPort=9000,numCallbackHosts=10,maxDelay=1s,maxErrorProbability=0.1,lambda=1,halfLife=1,dryRun=false --iterations 10 --max-concurrent 10 + +// CompletionCallbackScenarioOptions are the options for the CompletionCallbackScenario. +type CompletionCallbackScenarioOptions struct { + // RPS is the maximum number of requests per second to send. This is required and must be > 0. + RPS int + // Logger must be non-nil. + Logger *zap.SugaredLogger + // SdkClient is the client to use to start workflows. This is required. + SdkClient client.Client + // Clock is for retry delays. This is required. + Clock clock.Clock + // StartingPort is the port of the first host to use for callbacks. Each host will use a different port starting + // from this value. This is required and must be in [1024, 65535]. + StartingPort int + // NumCallbackHosts is the number of hosts to use for callbacks. This is required and must be > 0. + NumCallbackHosts int + // CallbackHostName is the host name to use for the callback URL. Do not include the port. + CallbackHostName string + // DryRun determines whether this is a dry run. If the value is true, the scenario will not actually execute + // workflows, but will instead just log what it would have done. + DryRun bool + // Lambda is the λ parameter for the exponential distribution function used to determine which host to use for a + // given workflow. A value close to zero means that all hosts have a similar priority of being selected. The higher + // the value, the more likely it is that the first host will be selected. This must be > 0. + Lambda float64 + // HalfLife is τ for the exponential decay function used to determine the delay and error probability for a given + // host. This means that the delay and error probability will be halved for each subsequent host. This must be > 0. + // Set it to a very large value to make all hosts have the same delay and error probability. Set it to a very small + // value to make only the first host have a very large delay and error probability. + HalfLife float64 + // MaxDelay is the maximum delay to use for a callback. The actual delay will be this value times the exponential + // decay function of the host index. This must be >= 0. + MaxDelay time.Duration + // MaxErrorProbability is the maximum probability that a callback will fail. This is used to simulate a callback + // that fails to be delivered. The actual probability of failure will be this value times the exponential decay + // function of the host index. This must be in [0, 1]. + MaxErrorProbability float64 + // AttachWorkflowID determines whether the workflow ID should be attached to the callback URL. This is useful for + // debugging. + AttachWorkflowID bool + // AttachCallbacks determines whether the callback URLs should be attached to the workflow. + AttachCallbacks bool + // MaxErrors is the maximum number of errors to allow before failing the scenario. + MaxErrors int +} + +type completionCallbackScenarioIterationResult struct { + // WorkflowID of the workflow that was executed. + WorkflowID string + // RunID of the workflow that was executed. + RunID string + // URL of the callback that was used. + URL *url.URL +} + +type completionCallbackScenarioExecutor struct{} + +const ( + // OptionKeyRPS determines CompletionCallbackScenarioOptions.RPS. The default value is 10. + OptionKeyRPS = "rps" + // OptionKeyStartingPort determines CompletionCallbackScenarioOptions.StartingPort. + OptionKeyStartingPort = "startingPort" + // OptionKeyNumCallbackHosts determines CompletionCallbackScenarioOptions.NumCallbackHosts. + OptionKeyNumCallbackHosts = "numCallbackHosts" + // OptionKeyCallbackHostName determines CompletionCallbackScenarioOptions.CallbackHostName. + OptionKeyCallbackHostName = "hostName" + // OptionKeyDryRun determines CompletionCallbackScenarioOptions.DryRun. + OptionKeyDryRun = "dryRun" + // OptionKeyLambda determines CompletionCallbackScenarioOptions.Lambda. The default value is 1.0. + OptionKeyLambda = "lambda" + // OptionKeyHalfLife determines CompletionCallbackScenarioOptions.HalfLife. The default value is 1.0. + OptionKeyHalfLife = "halfLife" + // OptionKeyMaxDelay determines CompletionCallbackScenarioOptions.MaxDelay. The default value is 1s. + OptionKeyMaxDelay = "maxDelay" + // OptionKeyMaxErrorProbability determines CompletionCallbackScenarioOptions.MaxErrorProbability. + // The default value is 0.05. + OptionKeyMaxErrorProbability = "maxErrorProbability" + // OptionKeyAttachWorkflowID determines CompletionCallbackScenarioOptions.AttachWorkflowID. The default value is + // true. + OptionKeyAttachWorkflowID = "attachWorkflowID" + // OptionKeyAttachCallbacks determines CompletionCallbackScenarioOptions.AttachCallbacks. The default value is + // true. + OptionKeyAttachCallbacks = "attachCallbacks" + // OptionKeyMaxErrors determines CompletionCallbackScenarioOptions.MaxErrors. The default value is 1. + OptionKeyMaxErrors = "maxErrors" +) + +func init() { + loadgen.MustRegisterScenario(loadgen.Scenario{ + Description: "For this scenario, Iterations is not supported and Duration is required. We run a single" + + " iteration which will spawn a number of workflows, execute them, and verify that all callbacks are" + + " eventually delivered.", + Executor: completionCallbackScenarioExecutor{}, + }) +} + +// ExponentialSample returns a sample from an exponential distribution with the given lambda. +// The cdf of the exponential distribution is: +// +// cdf(x) = 1 - lambda * exp(-lambda * x) +// +// Here, the probability that the returned value is equal to `i` is: +// 0 if i < 0 or i > n +// (cdf(i+1)-cdf(i))/cdf(n) if 0 <= i < n +// +// The `u` parameter should be a uniform random number in [0, 1). +func ExponentialSample(n int, lambda float64, u float64) (int, error) { + if u < 0 || u >= 1 { + return 0, fmt.Errorf("u must be in [0, 1)") + } + totalProbability := 1 - math.Exp(-lambda*float64(n)) + for i := 1; i < n; i++ { + cdf := 1 - math.Exp(-lambda*float64(i)) + if u <= cdf/totalProbability { + return i - 1, nil + } + } + return n - 1, nil +} + +// RunCompletionCallbackScenario runs a scenario where each iteration executes a single workflow that has a completion +// callback attached targeting one of a given set of addresses. After each iteration, we query all workflows to verify +// that all callbacks have been delivered. +func RunCompletionCallbackScenario( + ctx context.Context, + opts *CompletionCallbackScenarioOptions, + info loadgen.ScenarioInfo, +) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + if err := validateOptions(opts); err != nil { + return err + } + opts.Logger.Infow("Starting scenario", "options", opts) + var ( + numCallbacksDelivered, numCallbacksFailed, numWorkflowsStarted, numWorkflowsFinished, numWorkflowsFailed atomic.Int32 + ) + go func() { + t := time.NewTicker(time.Second) + for { + select { + case <-ctx.Done(): + t.Stop() + return + case <-t.C: + opts.Logger.Infow("Scenario status", + "numWorkflowsStarted", numWorkflowsStarted.Load(), "numWorkflowsFinished", numWorkflowsFinished.Load(), "numWorkflowsFailed", numWorkflowsFailed.Load(), + "numCallbacksDelivered", numCallbacksDelivered.Load(), "numCallbacksFailed", numCallbacksFailed.Load(), + ) + } + } + }() + rateLimiter := rate.NewLimiter(rate.Limit(opts.RPS), opts.RPS) + results := make([]*completionCallbackScenarioIterationResult, 0, info.Configuration.Iterations) + l := &loadgen.GenericExecutor{ + Execute: func(ctx context.Context, run *loadgen.Run) error { + res, err := runWorkflow(ctx, opts, run.DefaultStartWorkflowOptions(), rateLimiter, &numWorkflowsStarted) + if err != nil { + opts.Logger.Errorw("Run workflow failed", "error", err) + numFailed := numWorkflowsFailed.Add(1) + if numFailed > int32(opts.MaxErrors) { + return fmt.Errorf("max errors exceeded: %d; last error: %w", opts.MaxErrors, err) + } + return nil + } + numWorkflowsFinished.Add(1) + opts.Logger.Debugw("Workflow finished", "url", res.URL.String()) + results = append(results, res) + return nil + }, + } + if err := l.Run(ctx, info); err != nil { + return fmt.Errorf("completion callback scenario run generic executor: %w", err) + } + opts.Logger.Infow("All workflows finished", "numWorkflows", len(results)) + if !opts.AttachCallbacks { + opts.Logger.Infow("Skipping callback verification because callbacks are not attached") + return nil + } + for _, res := range results { + opts.Logger.Debugw("Verifying callback succeeded", "url", res.URL.String()) + err := verifyCallbackSucceeded(ctx, opts, rateLimiter, res.WorkflowID, res.RunID, res.URL) + if err != nil { + numCallbacksFailed.Add(1) + opts.Logger.Errorw("Callback verification failed", "url", res.URL.String(), "error", err) + } else { + numCallbacksDelivered.Add(1) + opts.Logger.Debugw("Callback succeeded", "url", res.URL.String()) + } + } + if numCallbacksFailed.Load() > 0 { + return fmt.Errorf("%d callbacks failed", numCallbacksFailed.Load()) + } + return nil +} + +func (completionCallbackScenarioExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error { + opts := &CompletionCallbackScenarioOptions{} + parseOptions(info.ScenarioOptions, opts) + opts.Clock = clock.New() + opts.Logger = info.Logger + opts.SdkClient = info.Client + return RunCompletionCallbackScenario(ctx, opts, info) +} + +func timed(f func() error) error { + _, err := timed2(func() (struct{}, error) { + return struct{}{}, f() + }) + return err +} + +func timed2[T any](f func() (T, error)) (T, error) { + now := time.Now() + t, err := f() + if err != nil { + if strings.Contains(err.Error(), "deadline") { + return t, fmt.Errorf("deadline exceeded after %v", time.Since(now)) + } + return t, err + } + return t, nil +} + +func runWorkflow( + ctx context.Context, + scenarioOptions *CompletionCallbackScenarioOptions, + startWorkflowOptions client.StartWorkflowOptions, + limiter *rate.Limiter, + numStarted *atomic.Int32, +) (*completionCallbackScenarioIterationResult, error) { + workflowID := uuid.New() + u, err := generateURLFromOptions(scenarioOptions, workflowID) + if err != nil { + return nil, fmt.Errorf("generate callback URL: %w", err) + } + + if scenarioOptions.DryRun { + return nil, nil + } + + if scenarioOptions.AttachCallbacks { + completionCallbacks := []*commonpb.Callback{{ + Variant: &commonpb.Callback_Nexus_{ + Nexus: &commonpb.Callback_Nexus{ + Url: u.String(), + }, + }, + }} + startWorkflowOptions.CompletionCallbacks = completionCallbacks + } + startWorkflowOptions.ID = workflowID + input := &kitchensink.WorkflowInput{ + InitialActions: []*kitchensink.ActionSet{ + kitchensink.NoOpSingleActivityActionSet(), + }, + } + if err := limiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("wait for rate limiter to start workflow: %w", err) + } + workflowRun, err := timed2(func() (client.WorkflowRun, error) { + return scenarioOptions.SdkClient.ExecuteWorkflow(ctx, startWorkflowOptions, common.WorkflowNameKitchenSink, input) + }) + if err != nil { + return nil, fmt.Errorf("start workflow with completion callback: %w", err) + } + numStarted.Add(1) + if err := limiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("wait for rate limiter to get workflow completion: %w", err) + } + err = workflowRun.Get(ctx, nil) + if err != nil { + return nil, fmt.Errorf("wait for workflow with completion callback: %w", err) + } + + return &completionCallbackScenarioIterationResult{ + WorkflowID: workflowRun.GetID(), + RunID: workflowRun.GetRunID(), + URL: u, + }, nil +} + +func verifyCallbackSucceeded( + ctx context.Context, + options *CompletionCallbackScenarioOptions, + limiter *rate.Limiter, + workflowID string, + runID string, + u *url.URL, +) error { + retryDelay := time.Millisecond * 10 + for { + if err := limiter.Wait(ctx); err != nil { + return fmt.Errorf("wait for rate limiter to verify callback succeeded: %w", err) + } + execution, err := options.SdkClient.DescribeWorkflowExecution(ctx, workflowID, runID) + if err != nil { + return fmt.Errorf("verify callback succeeded describe workflow: %w", err) + } + callbacks := execution.Callbacks + if len(callbacks) != 1 { + callbacksString := "" + for i, callback := range callbacks { + callbacksString += fmt.Sprintf("%d: %t: %+v\n", i, callback == nil, callback) + } + return fmt.Errorf("expected 1 callback, got %d: %s", len(callbacks), callbacksString) + } + callback := callbacks[0] + if callback.State == enums.CALLBACK_STATE_SUCCEEDED { + if callback.Callback.GetNexus().Url != u.String() { + return fmt.Errorf("expected callback URL %q, got %q", u.String(), callback.Callback.GetNexus().Url) + } + return nil + } + if callback.State == enums.CALLBACK_STATE_BACKING_OFF { + options.Logger.Debugw("Callback backing off", "failure", callback.LastAttemptFailure) + } + if callback.State == enums.CALLBACK_STATE_FAILED { + return fmt.Errorf("callback failed: %+v", callback.LastAttemptFailure) + } + timer := options.Clock.Timer(retryDelay) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + retryDelay *= 2 + } +} + +// validateOptions validates the options for this scenario. +func validateOptions(options *CompletionCallbackScenarioOptions) error { + var errs []error + if options.RPS <= 0 { + errs = append(errs, fmt.Errorf("%q is required and must be > 0", OptionKeyRPS)) + } + if options.StartingPort < 1024 || options.StartingPort >= 65535 { + errs = append(errs, fmt.Errorf("%q is required and must be in [1024, 65535]", OptionKeyStartingPort)) + } + if options.NumCallbackHosts <= 0 { + errs = append(errs, fmt.Errorf("%q is required and must be > 0", OptionKeyNumCallbackHosts)) + } + if options.CallbackHostName == "" { + errs = append(errs, fmt.Errorf("%q is required", OptionKeyCallbackHostName)) + } + if options.Lambda <= 0 { + errs = append(errs, fmt.Errorf("%q must be > 0", OptionKeyLambda)) + } + if options.HalfLife <= 0 { + errs = append(errs, fmt.Errorf("%q must be > 0", OptionKeyHalfLife)) + } + if options.MaxDelay < 0 { + errs = append(errs, fmt.Errorf("%q must be >= 0s", OptionKeyMaxDelay)) + } + if options.MaxErrorProbability < 0 || options.MaxErrorProbability > 1 { + errs = append(errs, fmt.Errorf("%q must be in [0, 1]", OptionKeyMaxErrorProbability)) + } + if len(errs) > 0 { + return multierr.Combine(errs...) + } + + return nil +} + +// parseOptions parses the options for this scenario from the given map. +func parseOptions(m map[string]string, options *CompletionCallbackScenarioOptions) *CompletionCallbackScenarioOptions { + options.RPS = loadgen.ScenarioOptionInt(m, OptionKeyRPS, 10) + options.StartingPort = loadgen.ScenarioOptionInt(m, OptionKeyStartingPort, 0) + options.NumCallbackHosts = loadgen.ScenarioOptionInt(m, OptionKeyNumCallbackHosts, 0) + options.CallbackHostName = m[OptionKeyCallbackHostName] + options.DryRun = loadgen.ScenarioOptionBool(m, OptionKeyDryRun, false) + options.Lambda = loadgen.ScenarioOptionFloat64(m, OptionKeyLambda, 1.0) + options.HalfLife = loadgen.ScenarioOptionFloat64(m, OptionKeyHalfLife, 1.0) + options.MaxDelay = loadgen.ScenarioOptionDuration(m, OptionKeyMaxDelay, time.Second*1) + options.MaxErrorProbability = loadgen.ScenarioOptionFloat64(m, OptionKeyMaxErrorProbability, 0.05) + options.AttachWorkflowID = loadgen.ScenarioOptionBool(m, OptionKeyAttachWorkflowID, true) + options.AttachCallbacks = loadgen.ScenarioOptionBool(m, OptionKeyAttachCallbacks, true) + options.MaxErrors = loadgen.ScenarioOptionInt(m, OptionKeyMaxErrors, 0) + return options +} + +// generateURLFromOptions generates a callback URL from the given options. +func generateURLFromOptions(options *CompletionCallbackScenarioOptions, workflowID string) (*url.URL, error) { + hostIndex, err := ExponentialSample(options.NumCallbackHosts, options.Lambda, rand.Float64()) + if err != nil { + return nil, err + } + + // The decayLambda is different from the lambda parameter used to select the host. + // https://en.wikipedia.org/wiki/Exponential_decay#Half-life + decayLambda := math.Ln2 / options.HalfLife + + callbackDelay := time.Duration(options.MaxDelay.Seconds() * math.Exp(-decayLambda*float64(hostIndex)) * float64(time.Second)) + + errorProbability := options.MaxErrorProbability * math.Exp(-decayLambda*float64(hostIndex)) + + port := options.StartingPort + hostIndex + + q := url.Values{} + q.Add("delay", fmt.Sprintf("%s", callbackDelay)) + q.Add("failure-probability", fmt.Sprintf("%f", errorProbability)) + if options.AttachWorkflowID { + q.Add("workflow-id", workflowID) + } + u := &url.URL{ + Scheme: "http", + Host: fmt.Sprintf("%s:%d", options.CallbackHostName, port), + RawQuery: q.Encode(), + } + return u, nil +} diff --git a/scenarios/completion_callbacks_test.go b/scenarios/completion_callbacks_test.go new file mode 100644 index 0000000..1f7aae8 --- /dev/null +++ b/scenarios/completion_callbacks_test.go @@ -0,0 +1,266 @@ +package scenarios_test + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "strconv" + "testing" + "time" + + "github.com/facebookgo/clock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/temporalio/omes/loadgen" + "github.com/temporalio/omes/scenarios" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflow/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/mocks" + "go.uber.org/zap" +) + +func TestExponentialSample(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + lambda float64 + numSamples int + expectedCounts []int + }{ + {"large lambda", 3.0, 500, []int{480, 19, 1, 0, 0}}, + {"lambda=1.0", 1.0, 500, []int{338, 111, 35, 9, 7}}, + {"small lambda", 1e-9, 500, []int{107, 122, 86, 102, 83}}, + } { + t.Run(tc.name, func(t *testing.T) { + g := rand.New(rand.NewSource(6174)) + counts := make([]int, len(tc.expectedCounts)) + for i := 0; i < tc.numSamples; i++ { + sample, err := scenarios.ExponentialSample(len(tc.expectedCounts), tc.lambda, g.Float64()) + require.NoError(t, err) + counts[sample]++ + } + assert.Equal(t, tc.expectedCounts, counts, + "Counts should be roughly proportional to the exponential distribution") + }) + } +} + +func TestNewCompletionCallbackScenario(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + scenarioOptionsOverride func(opts *scenarios.CompletionCallbackScenarioOptions) + expectedErrSubstring string + }{ + { + name: "zero port", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.StartingPort = 0 + }, + expectedErrSubstring: fmt.Sprintf("%q is required", scenarios.OptionKeyStartingPort), + }, + { + name: "zero callback hosts", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.NumCallbackHosts = 0 + }, + expectedErrSubstring: fmt.Sprintf("%q is required", scenarios.OptionKeyNumCallbackHosts), + }, + { + name: "negative lambda", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.Lambda = -1.0 + }, + expectedErrSubstring: fmt.Sprintf("%q must be > 0", scenarios.OptionKeyLambda), + }, + { + name: "zero half-life", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.HalfLife = 0.0 + }, + expectedErrSubstring: fmt.Sprintf("%q must be > 0", scenarios.OptionKeyHalfLife), + }, + { + name: "negative delay distribution max", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.MaxDelay = -1.0 + }, + expectedErrSubstring: fmt.Sprintf("%q must be >= 0s", scenarios.OptionKeyMaxDelay), + }, + { + name: "negative error probability max", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.MaxErrorProbability = -1.0 + }, + expectedErrSubstring: fmt.Sprintf("%q must be in [0, 1]", scenarios.OptionKeyMaxErrorProbability), + }, + { + name: "error probability max > 1", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.MaxErrorProbability = 1.1 + }, + expectedErrSubstring: fmt.Sprintf("%q must be in [0, 1]", scenarios.OptionKeyMaxErrorProbability), + }, + { + name: "no host name", + scenarioOptionsOverride: func(opts *scenarios.CompletionCallbackScenarioOptions) { + opts.CallbackHostName = "" + }, + expectedErrSubstring: fmt.Sprintf("%q is required", scenarios.OptionKeyCallbackHostName), + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + scenarioOptions := &scenarios.CompletionCallbackScenarioOptions{ + CallbackHostName: "localhost", + StartingPort: 1024, + NumCallbackHosts: 10, + Lambda: 1.0, + HalfLife: 1.0, + MaxDelay: 5 * time.Second, + MaxErrorProbability: 0.0, + } + if tc.scenarioOptionsOverride != nil { + tc.scenarioOptionsOverride(scenarioOptions) + } + err := scenarios.RunCompletionCallbackScenario(context.Background(), scenarioOptions, loadgen.ScenarioInfo{}) + if tc.expectedErrSubstring != "" { + assert.ErrorContains(t, err, tc.expectedErrSubstring) + } + }) + } +} + +func TestCompletionCallbackScenario_Run(t *testing.T) { + t.Parallel() + + // Timeout the test after a second. + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, time.Second) + t.Cleanup(cancel) + + // Mock SDK client. + sdkClient := &mocks.Client{} + executeWorkflowRequests := make(chan client.StartWorkflowOptions, 1) + workflowRun := &mocks.WorkflowRun{} + workflowRun.On("Get", mock.Anything, mock.Anything).Return(nil) + workflowRun.On("GetID").Return("test-workflow-id") + workflowRun.On("GetRunID").Return("test-run-id") + sdkClient.On("ExecuteWorkflow", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + select { + case <-ctx.Done(): + t.Fatalf("test canceled before workflow execution: %v", ctx.Err()) + case executeWorkflowRequests <- args.Get(1).(client.StartWorkflowOptions): + } + }).Return(workflowRun, nil) + + // First call to DescribeWorkflowExecution returns a backing off callback. + sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return( + &workflowservice.DescribeWorkflowExecutionResponse{ + Callbacks: []*workflow.CallbackInfo{ + { + State: enums.CALLBACK_STATE_BACKING_OFF, + }, + }, + }, + nil, + ).Times(1) + + // Advance the clock so that we retry after 1 timer. + clkDone := make(chan struct{}) + clk := clock.NewMock() + defer func() { + select { + case <-clkDone: + case <-ctx.Done(): + t.Errorf("test canceled before clock finished: %v", ctx.Err()) + } + }() + go func() { + defer close(clkDone) + clk.Wait(clock.Calls{ + Timer: 1, + }) + clk.Add(time.Second) + }() + + // Second call to DescribeWorkflowExecution returns a succeeded callback. + sdkClient.On("DescribeWorkflowExecution", mock.Anything, mock.Anything, mock.Anything).Return( + &workflowservice.DescribeWorkflowExecutionResponse{ + Callbacks: []*workflow.CallbackInfo{ + { + State: enums.CALLBACK_STATE_SUCCEEDED, + Callback: &common.Callback{ + Variant: &common.Callback_Nexus_{ + Nexus: &common.Callback_Nexus{ + Url: "http://localhost:1024?delay=0s&failure-probability=0.000000", + }, + }, + }, + }, + }, + }, + nil, + ).Times(1) + + // Create the scenario. + logger := zap.NewNop() + opts := &scenarios.CompletionCallbackScenarioOptions{ + RPS: 100, + Logger: logger.Sugar(), + SdkClient: sdkClient, + Clock: clk, + StartingPort: 1024, + NumCallbackHosts: 1, + CallbackHostName: "localhost", + DryRun: false, + Lambda: 1.0, + HalfLife: 1.0, + MaxDelay: 0, + MaxErrorProbability: 0.0, + AttachWorkflowID: false, + AttachCallbacks: true, + } + + // Run the scenario. + err := scenarios.RunCompletionCallbackScenario(ctx, opts, loadgen.ScenarioInfo{ + MetricsHandler: client.MetricsNopHandler, + Logger: logger.Sugar(), + Configuration: loadgen.RunConfiguration{ + Iterations: 1, + }, + }) + require.NoError(t, err) + + // Get the request sent to the SDK client and verify it. + var startWorkflowOptions client.StartWorkflowOptions + select { + case <-ctx.Done(): + t.Fatalf("test canceled before workflow execution: %v", ctx.Err()) + case startWorkflowOptions = <-executeWorkflowRequests: + } + if assert.Len(t, startWorkflowOptions.CompletionCallbacks, 1) { + cb := startWorkflowOptions.CompletionCallbacks[0] + nexusCb := cb.GetNexus() + require.NotNilf(t, nexusCb, "Completion callback should be a Nexus callback") + u, err := url.Parse(nexusCb.Url) + require.NoError(t, err) + assert.Equal(t, "http", u.Scheme) + assert.Equal(t, "localhost", u.Hostname()) + assert.Equal(t, "1024", u.Port()) + assert.Empty(t, u.Path) + q := u.Query() + duration, err := time.ParseDuration(q.Get("delay")) + require.NoError(t, err) + assert.Zero(t, duration) + errorProbability, err := strconv.ParseFloat(q.Get("failure-probability"), 32) + require.NoError(t, err) + assert.Zero(t, errorProbability) + } +} diff --git a/scenarios/state_transitions_steady.go b/scenarios/state_transitions_steady.go index c69c7cd..02cc6fc 100644 --- a/scenarios/state_transitions_steady.go +++ b/scenarios/state_transitions_steady.go @@ -49,11 +49,9 @@ func (s *stateTransitionsSteady) run(ctx context.Context) error { ) // Execute initial workflow and get the transition count - workflowParams := &kitchensink.TestInput{ - WorkflowInput: &kitchensink.WorkflowInput{ - InitialActions: []*kitchensink.ActionSet{ - kitchensink.NoOpSingleActivityActionSet(), - }, + workflowParams := &kitchensink.WorkflowInput{ + InitialActions: []*kitchensink.ActionSet{ + kitchensink.NoOpSingleActivityActionSet(), }, } workflowRun, err := s.Client.ExecuteWorkflow( diff --git a/scenarios/throughput_stress.go b/scenarios/throughput_stress.go index 05a4213..f990292 100644 --- a/scenarios/throughput_stress.go +++ b/scenarios/throughput_stress.go @@ -4,10 +4,12 @@ import ( "context" "errors" "fmt" - "go.temporal.io/api/workflowservice/v1" "sync/atomic" "time" + "github.com/temporalio/omes/common" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/api/enums/v1" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/serviceerror" @@ -79,7 +81,7 @@ func (t *tpsExecutor) Run(ctx context.Context, info loadgen.ScenarioInfo) error ThroughputStressScenarioIdSearchAttribute: run.ScenarioInfo.RunID, }, }, - "throughputStress", + common.WorkflowNameThroughputStress, &result, throughputstress.WorkflowParams{ Iterations: internalIterations, diff --git a/workers/go/go.mod b/workers/go/go.mod index 01af35f..e2f2962 100644 --- a/workers/go/go.mod +++ b/workers/go/go.mod @@ -1,14 +1,16 @@ module github.com/temporalio/omes/workers/go -go 1.20 +go 1.21 + +toolchain go1.21.5 require github.com/temporalio/omes v1.0.0 require ( github.com/spf13/cobra v1.7.0 - go.temporal.io/api v1.26.1 - go.temporal.io/sdk v1.25.2-0.20231129171107-288a04f72145 - go.uber.org/zap v1.25.0 + go.temporal.io/api v1.26.1-0.20240106224952-e65d246174b6 + go.temporal.io/sdk v1.25.2-0.20240110060334-d09a3f3bfe09 + go.uber.org/zap v1.26.0 ) require ( @@ -25,6 +27,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pborman/uuid v1.2.1 // indirect + github.com/pkg/errors v0.8.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect @@ -36,15 +39,16 @@ require ( github.com/stretchr/testify v1.8.4 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sync v0.5.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.3.0 // indirect - google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect - google.golang.org/grpc v1.59.0 // indirect + google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 // indirect + google.golang.org/grpc v1.60.1 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/workers/go/go.sum b/workers/go/go.sum index 2f7b730..e6edf9c 100644 --- a/workers/go/go.sum +++ b/workers/go/go.sum @@ -1,7 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -37,6 +36,7 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -51,14 +51,17 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw= github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -74,6 +77,7 @@ github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/ github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= +github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= @@ -98,25 +102,28 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.26.1 h1:YqGQsOr/Tx4nVdA8wCv74AxesaIzCRHWb3KkHrYqI8k= -go.temporal.io/api v1.26.1/go.mod h1:Y/rALXTprFO+bvAlAfLFoJj7KpQIcL4GDQVN6fhYIa4= -go.temporal.io/sdk v1.25.2-0.20231129171107-288a04f72145 h1:aV7tRpzB3tr9LGs4/SN7MSWSbVx+bgDYfOoGMjk4oEM= -go.temporal.io/sdk v1.25.2-0.20231129171107-288a04f72145/go.mod h1:MHw8PEOVmOJC1yduTVxYq1GsM5kkQg0sIwRST7cRHoo= +go.temporal.io/api v1.26.1-0.20240106224952-e65d246174b6 h1:HMgz/R0Zy60KD6/QS2zyWBFgUCSusXMimXUl9fIWcg4= +go.temporal.io/api v1.26.1-0.20240106224952-e65d246174b6/go.mod h1:d5Zx/HUcbL3/wmH89RNnjeV2BuOHeofAkilC4F2MuKA= +go.temporal.io/sdk v1.25.2-0.20240110060334-d09a3f3bfe09 h1:8ZFyOs0mVb4Sn+Oxqq1YlgMcIFijL/KBDKoqtG0u++Q= +go.temporal.io/sdk v1.25.2-0.20240110060334-d09a3f3bfe09/go.mod h1:Gf7Zou4O35s8/F3rmKsu0C5hXD+BYBJHHz273ji7Eiw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= -go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= -go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -182,19 +189,19 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= -google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4 h1:W12Pwm4urIbRdGhMEg2NM9O3TWKjNcxQhs46V0ypf/k= -google.golang.org/genproto v0.0.0-20231127180814-3a041ad873d4/go.mod h1:5RBcpGRxr25RbDzY5w+dmaqpSEvl8Gwl1x2CICf60ic= -google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 h1:ZcOkrmX74HbKFYnpPY8Qsw93fC29TbJXspYKaBkSXDQ= -google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4/go.mod h1:k2dtGpRrbsSyKcNPKKI5sstZkrNCZwpU/ns96JoHbGg= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= +google.golang.org/genproto v0.0.0-20231212172506-995d672761c0 h1:YJ5pD9rF8o9Qtta0Cmy9rdBwkSjrTCT6XTiUQVOtIos= +google.golang.org/genproto v0.0.0-20231212172506-995d672761c0/go.mod h1:l/k7rMz0vFTBPy+tFSGvXEd3z+BcoG1k7EHbqm+YBsY= +google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0 h1:s1w3X6gQxwrLEpxnLd/qXTVLgQE2yXwaOaoa6IlY/+o= +google.golang.org/genproto/googleapis/api v0.0.0-20231212172506-995d672761c0/go.mod h1:CAny0tYF+0/9rmDB9fahA9YLzX3+AEVl1qXbv5hhj6c= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= -google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= -google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/grpc v1.60.1 h1:26+wFr+cNqSGFcOXcabYC0lUVJVRa2Sb2ortSK7VrEU= +google.golang.org/grpc v1.60.1/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= @@ -202,6 +209,7 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/workers/go/kitchensink/kitchen_sink.go b/workers/go/kitchensink/kitchen_sink.go index ed5d1a5..5300ed1 100644 --- a/workers/go/kitchensink/kitchen_sink.go +++ b/workers/go/kitchensink/kitchen_sink.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + ocommon "github.com/temporalio/omes/common" "github.com/temporalio/omes/loadgen/kitchensink" "go.temporal.io/api/common/v1" "go.temporal.io/sdk/temporal" @@ -130,7 +131,7 @@ func (ws *KSWorkflowState) handleAction( return nil, temporal.NewApplicationError(re.Failure.Message, "") } else if can := action.GetContinueAsNew(); can != nil { // Use string arg to avoid the SDK trying to convert payload to input type - return nil, workflow.NewContinueAsNewError(ctx, "kitchenSink", can.GetArguments()[0]) + return nil, workflow.NewContinueAsNewError(ctx, ocommon.WorkflowNameKitchenSink, can.GetArguments()[0]) } else if timer := action.GetTimer(); timer != nil { return nil, withAwaitableChoice(ctx, func(ctx workflow.Context) workflow.Future { fut, setter := workflow.NewFuture(ctx) @@ -144,7 +145,7 @@ func (ws *KSWorkflowState) handleAction( return nil, launchActivity(ctx, action.GetExecActivity()) } else if child := action.GetExecChildWorkflow(); child != nil { // Use name if present, otherwise use this one - childType := "kitchenSink" + childType := ocommon.WorkflowNameKitchenSink if child.WorkflowType != "" { childType = child.WorkflowType } diff --git a/workers/go/throughputstress/workflow.go b/workers/go/throughputstress/workflow.go index 5bfac88..774ac63 100644 --- a/workers/go/throughputstress/workflow.go +++ b/workers/go/throughputstress/workflow.go @@ -2,9 +2,11 @@ package throughputstress import ( "fmt" - "github.com/temporalio/omes/workers/go/workflowutils" "time" + "github.com/temporalio/omes/common" + "github.com/temporalio/omes/workers/go/workflowutils" + "github.com/temporalio/omes/loadgen/throughputstress" "github.com/temporalio/omes/scenarios" "go.temporal.io/sdk/temporal" @@ -161,7 +163,7 @@ func ThroughputStressWorkflow(ctx workflow.Context, params *throughputstress.Wor params.InitialIteration = i params.TimesContinued++ params.ChildrenSpawned = output.ChildrenSpawned - return output, workflow.NewContinueAsNewError(ctx, "throughputStress", params) + return output, workflow.NewContinueAsNewError(ctx, common.WorkflowNameThroughputStress, params) } } diff --git a/workers/go/worker/worker.go b/workers/go/worker/worker.go index 01999c1..6ddae66 100644 --- a/workers/go/worker/worker.go +++ b/workers/go/worker/worker.go @@ -5,6 +5,7 @@ import ( "github.com/spf13/cobra" "github.com/temporalio/omes/cmd/cmdoptions" + "github.com/temporalio/omes/common" "github.com/temporalio/omes/workers/go/kitchensink" "github.com/temporalio/omes/workers/go/throughputstress" "go.temporal.io/sdk/activity" @@ -66,10 +67,10 @@ func runWorkers(client client.Client, taskQueues []string, options cmdoptions.Wo MaxConcurrentActivityTaskPollers: options.MaxConcurrentActivityPollers, MaxConcurrentWorkflowTaskPollers: options.MaxConcurrentWorkflowPollers, }) - w.RegisterWorkflowWithOptions(kitchensink.KitchenSinkWorkflow, workflow.RegisterOptions{Name: "kitchenSink"}) + w.RegisterWorkflowWithOptions(kitchensink.KitchenSinkWorkflow, workflow.RegisterOptions{Name: common.WorkflowNameKitchenSink}) w.RegisterActivityWithOptions(kitchensink.Noop, activity.RegisterOptions{Name: "noop"}) w.RegisterActivityWithOptions(kitchensink.Delay, activity.RegisterOptions{Name: "delay"}) - w.RegisterWorkflowWithOptions(throughputstress.ThroughputStressWorkflow, workflow.RegisterOptions{Name: "throughputStress"}) + w.RegisterWorkflowWithOptions(throughputstress.ThroughputStressWorkflow, workflow.RegisterOptions{Name: common.WorkflowNameThroughputStress}) w.RegisterWorkflow(throughputstress.ThroughputStressChild) w.RegisterActivity(&tpsActivities) errCh <- w.Run(worker.InterruptCh())