Skip to content

Commit

Permalink
feature: Add Replayer API and Update workflow handler
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Jul 31, 2023
1 parent 844ba58 commit cf7b8ff
Show file tree
Hide file tree
Showing 14 changed files with 452 additions and 400 deletions.
3 changes: 0 additions & 3 deletions .gitmodules

This file was deleted.

5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,4 @@ test_coverage:
mkdir ./coverage-ci
go test -v -race -cover -tags=debug -failfast -coverpkg=./... -coverprofile=./coverage-ci/temporal.out -covermode=atomic ./...
echo 'mode: atomic' > ./coverage-ci/summary.txt
tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt

generate-proto:
protoc -I./proto/api -I./proto --go_out=proto/protocol/v1 ./proto/protocol/v1/protocol.proto
tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt
43 changes: 40 additions & 3 deletions aggregatedpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ import (
"go.uber.org/zap"
)

type res string

const (
completed string = "completed"

Complete res = "Complete"
Reject res = "Reject"
)

// execution context.
Expand All @@ -27,6 +32,7 @@ func (wp *Workflow) getContext() *internal.Context {
TickTime: wp.env.Now().Format(time.RFC3339),
Replay: wp.env.IsReplaying(),
HistoryLen: wp.env.WorkflowInfo().GetCurrentHistoryLength(),
RrID: wp.rrID,
}
}

Expand All @@ -53,11 +59,36 @@ func (wp *Workflow) handleSignal(name string, input *commonpb.Payloads, header *
return nil
}

func (wp *Workflow) handleUpdate(updateName string, id string, pld *commonpb.Payloads, hdr *commonpb.Header, callbacks bindings.UpdateCallbacks) {
result, err := wp.runCommand(internal.InvokeUpdate{
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Name: updateName,
UpdateID: id,
}, pld, hdr)
if result == nil {
if err != nil {
callbacks.Reject(err)
return
}
callbacks.Reject(errors.Str("no command provided"))
return
}
switch result.Command {
case Complete:
callbacks.Accept()
callbacks.Complete(result.Payloads, err)
case Reject:
callbacks.Reject(err)
default:
callbacks.Reject(errors.Str("no command provided"))
}
}

// Handle query in blocking mode.
func (wp *Workflow) handleQuery(queryType string, queryArgs *commonpb.Payloads, header *commonpb.Header) (*commonpb.Payloads, error) {
const op = errors.Op("workflow_process_handle_query")
result, err := wp.runCommand(internal.InvokeQuery{
RunID: wp.runID,
RunID: wp.env.WorkflowInfo().WorkflowExecution.RunID,
Name: queryType,
}, queryArgs, header)

Expand Down Expand Up @@ -384,35 +415,41 @@ func (wp *Workflow) flushQueue() error {
// Run single command and return single result.
func (wp *Workflow) runCommand(cmd any, payloads *commonpb.Payloads, header *commonpb.Header) (*internal.Message, error) {
const op = errors.Op("workflow_process_runcommand")
msg := wp.mq.AllocateMessage(cmd, payloads, header)
msg := &internal.Message{}
wp.mq.AllocateMessage(cmd, payloads, header, msg)

if wp.mh != nil {
wp.mh.Gauge(RrMetricName).Update(float64(wp.pool.QueueSize()))
defer wp.mh.Gauge(RrMetricName).Update(float64(wp.pool.QueueSize()))
}

pld := &payload.Payload{}
pld := wp.getPld()
err := wp.codec.Encode(wp.getContext(), pld, msg)
if err != nil {
wp.putPld(pld)
return nil, err
}

// todo(rustatian): do we need a timeout here??
resp, err := wp.pool.Exec(context.Background(), pld)
if err != nil {
wp.putPld(pld)
return nil, err
}

msgs := make([]*internal.Message, 0, 2)
err = wp.codec.Decode(resp, &msgs)
if err != nil {
wp.putPld(pld)
return nil, err
}

if len(msgs) != 1 {
wp.putPld(pld)
return nil, errors.E(op, errors.Str("unexpected pool response"))
}

wp.putPld(pld)
return msgs[0], nil
}

Expand Down
10 changes: 7 additions & 3 deletions aggregatedpool/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/roadrunner-server/sdk/v4/payload"
"github.com/temporalio/roadrunner-temporal/v4/canceller"
"github.com/temporalio/roadrunner-temporal/v4/common"
Expand Down Expand Up @@ -42,13 +43,13 @@ func seq() uint64 {
type Workflow struct {
codec common.Codec
pool common.Pool
rrID string

env bindings.WorkflowEnvironment
header *commonpb.Header
mq *queue.MessageQueue
ids *registry.IDRegistry
seqID uint64
runID string
pipeline []*internal.Message
callbacks []Callback
canceller *canceller.Canceller
Expand All @@ -61,9 +62,9 @@ type Workflow struct {
pldPool *sync.Pool
}

// RoadRunner function
func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger) *Workflow {
return &Workflow{
rrID: uuid.NewString(),
log: log,
codec: codec,
pool: pool,
Expand All @@ -79,6 +80,7 @@ func NewWorkflowDefinition(codec common.Codec, pool common.Pool, log *zap.Logger
// DO NOT USE THIS FUNCTION DIRECTLY!!!!
func (wp *Workflow) NewWorkflowDefinition() bindings.WorkflowDefinition {
return &Workflow{
rrID: uuid.NewString(),
pool: wp.pool,
codec: wp.codec,
log: wp.log,
Expand All @@ -98,7 +100,6 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
wp.env = env
wp.header = header
wp.seqID = 0
wp.runID = env.WorkflowInfo().WorkflowExecution.RunID
wp.canceller = new(canceller.Canceller)

// sequenceID shared for all pool workflows
Expand All @@ -108,6 +109,7 @@ func (wp *Workflow) Execute(env bindings.WorkflowEnvironment, header *commonpb.H
env.RegisterCancelHandler(wp.handleCancel)
env.RegisterSignalHandler(wp.handleSignal)
env.RegisterQueryHandler(wp.handleQuery)
env.RegisterUpdateHandler(wp.handleUpdate)

var lastCompletion = bindings.GetLastCompletionResult(env)
var lastCompletionOffset = 0
Expand Down Expand Up @@ -167,13 +169,15 @@ func (wp *Workflow) OnWorkflowTaskStarted(t time.Duration) {

if msg.IsCommand() {
if msg.UndefinedResponse() {
wp.pipeline = nil
panic(fmt.Sprintf("undefined response: %s", msg.Command.(*internal.UndefinedResponse).Message))
}

err = wp.handleMessage(msg)
}

if err != nil {
wp.pipeline = nil
panic(err)
}
}
Expand Down
20 changes: 11 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,32 @@ require (
github.com/goccy/go-json v0.10.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.0
github.com/roadrunner-server/endure/v2 v2.2.1
github.com/roadrunner-server/api/v4 v4.6.0
github.com/roadrunner-server/endure/v2 v2.3.1
github.com/roadrunner-server/errors v1.2.0
github.com/roadrunner-server/sdk/v4 v4.3.1
github.com/stretchr/testify v1.8.4
github.com/uber-go/tally/v4 v4.1.6
go.temporal.io/api v1.23.0
go.temporal.io/api v1.23.1-0.20230721162852-f326b9afc8e1
go.temporal.io/sdk v1.23.1
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/server v1.21.1
go.temporal.io/server v1.21.3
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.31.0
)

require google.golang.org/genproto/googleapis/rpc v0.0.0-20230629202037-9506855d4529 // indirect

exclude github.com/uber-go/tally/v4 v4.1.7

replace go.temporal.io/sdk => ../../../../../temp/sdk-go

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.1 // indirect
github.com/golang/mock v1.7.0-rc.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
Expand All @@ -42,7 +43,7 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/roadrunner-server/goridge/v3 v3.6.3 // indirect
github.com/roadrunner-server/tcplisten v1.3.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
Expand All @@ -54,11 +55,12 @@ require (
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/grpc v1.56.2
google.golang.org/genproto/googleapis/rpc v0.0.0-20230726155614-23370e0ffb3e // indirect
google.golang.org/grpc v1.57.0
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit cf7b8ff

Please sign in to comment.