diff --git a/.vscode/settings.json b/.vscode/settings.json index fbb65db2..9f18a41d 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,12 @@ { - "cSpell.words": ["replayer"] + "cSpell.words": [ + "commonpb", + "errn", + "jsonpb", + "nolintlint", + "protobuf", + "replayer", + "stylecheck", + "temporalio" + ] } diff --git a/go.mod b/go.mod index 4a2456fa..c38a894c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.20 require ( github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c github.com/goccy/go-json v0.10.2 - github.com/golang/protobuf v1.5.3 github.com/google/uuid v1.3.0 github.com/roadrunner-server/api/v4 v4.6.1 github.com/roadrunner-server/endure/v2 v2.3.1 @@ -35,6 +34,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/gogo/status v1.1.1 // indirect github.com/golang/mock v1.7.0-rc.1 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pborman/uuid v1.2.1 // indirect diff --git a/go.sum b/go.sum index 12af899d..747687ae 100644 --- a/go.sum +++ b/go.sum @@ -1129,8 +1129,6 @@ go.temporal.io/api v1.23.1-0.20230721162852-f326b9afc8e1 h1:IMMihad9mxEsZHEnw6uH go.temporal.io/api v1.23.1-0.20230721162852-f326b9afc8e1/go.mod h1:Ezq+jBRcsvaZTbJaYzFhsl7r3c24YSI35lm0Af5rJG4= go.temporal.io/sdk/contrib/tally v0.2.0 h1:XnTJIQcjOv+WuCJ1u8Ve2nq+s2H4i/fys34MnWDRrOo= go.temporal.io/sdk/contrib/tally v0.2.0/go.mod h1:1kpSuCms/tHeJQDPuuKkaBsMqfHnIIRnCtUYlPNXxuE= -go.temporal.io/server v1.21.3 h1:vtGGCJJ6pG31JC64mRcjkdNYUxIdapMaPWPY1qMEuVA= -go.temporal.io/server v1.21.3/go.mod h1:o3azXry/6fuvSuXSpPmeCmkUmZ0RuFYgULJu67qwRZY= go.temporal.io/server v1.21.4 h1:lW32GaqHEWWveeEYKWFzqpfQTws5OXSVUvB3hEofCYc= go.temporal.io/server v1.21.4/go.mod h1:o3azXry/6fuvSuXSpPmeCmkUmZ0RuFYgULJu67qwRZY= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/rpc.go b/rpc.go index c91f5135..ee29e954 100644 --- a/rpc.go +++ b/rpc.go @@ -6,11 +6,10 @@ import ( "time" "github.com/gogo/protobuf/jsonpb" - v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck,nolintlint "github.com/roadrunner-server/api/v4/build/common/v1" + commonpb "github.com/roadrunner-server/api/v4/build/temporal/api/common/v1" protoApi "github.com/roadrunner-server/api/v4/build/temporal/v1" "github.com/temporalio/roadrunner-temporal/v4/internal/logger" - commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/enums/v1" "go.temporal.io/api/history/v1" "go.temporal.io/sdk/activity" @@ -56,7 +55,7 @@ func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHear details := &commonpb.Payloads{} if len(in.Details) != 0 { - if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil { + if err := proto.Unmarshal(in.Details, details); err != nil { return err } } @@ -109,7 +108,7 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes zap.String("workflow_name", in.GetWorkflowType().GetName())) if in.GetWorkflowExecution() == nil || in.GetWorkflowType() == nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.FailedPrecondition), Message: "run_id, workflow_id or workflow_name should not be empty", } @@ -124,21 +123,23 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes zap.String("workflow_name", in.GetWorkflowType().GetName())) if in.GetWorkflowExecution().GetRunId() == "" || in.GetWorkflowExecution().GetWorkflowId() == "" || in.GetWorkflowType().GetName() == "" { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.FailedPrecondition), Message: "run_id, workflow_id or workflow_name should not be empty", } r.plugin.log.Error("replay workflow request", zap.String("error", "run_id, workflow_id or workflow_name should not be empty")) + return nil } if r.plugin.rrWorkflowDef == nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.FailedPrecondition), Message: "workflow definition is not initialized", } r.plugin.log.Error("replay workflow request", zap.String("error", "workflow definition is not initialized")) + return nil } ctx, cancel := context.WithTimeout(context.Background(), time.Minute) @@ -149,7 +150,7 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes for iter.HasNext() { event, err := iter.Next() if err != nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.Internal), Message: err.Error(), } @@ -168,7 +169,7 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes err := replayer.ReplayWorkflowHistory(logger.NewZapAdapter(r.plugin.log), &hist) if err != nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.Internal), Message: err.Error(), } @@ -177,6 +178,10 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes return nil } + out.Status = &common.Status{ + Code: int32(codes.OK), + } + return nil } @@ -187,7 +192,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi. zap.String("save_path", in.GetSavePath())) if in.GetWorkflowExecution() == nil || in.GetWorkflowType() == nil || in.GetSavePath() == "" { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.FailedPrecondition), Message: "run_id, workflow_id or save_path should not be empty", } @@ -201,7 +206,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi. zap.String("save_path", in.GetSavePath())) if in.GetWorkflowExecution().GetRunId() == "" || in.GetWorkflowExecution().GetWorkflowId() == "" || in.GetWorkflowType().GetName() == "" { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.FailedPrecondition), Message: "run_id, workflow_id or save_path should not be empty", } @@ -212,7 +217,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi. file, err := os.Create(in.GetSavePath()) if err != nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.Internal), Message: err.Error(), } @@ -237,7 +242,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi. for iter.HasNext() { event, errn := iter.Next() if errn != nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.Internal), Message: errn.Error(), } @@ -252,7 +257,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi. marshaler := jsonpb.Marshaler{} err = marshaler.Marshal(file, &hist) if err != nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.Internal), Message: err.Error(), } @@ -261,6 +266,9 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi. return nil } + out.Status = &common.Status{ + Code: int32(codes.OK), + } r.plugin.log.Debug("history saved", zap.String("location", in.GetSavePath())) return nil @@ -272,7 +280,7 @@ func (r *rpc) ReplayFromJSON(in *protoApi.ReplayRequest, out *protoApi.ReplayRes zap.String("save_path", in.GetSavePath())) if in.GetWorkflowType() == nil || in.GetSavePath() == "" { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.FailedPrecondition), Message: "workflow_name and save_path should not be empty", } @@ -286,7 +294,7 @@ func (r *rpc) ReplayFromJSON(in *protoApi.ReplayRequest, out *protoApi.ReplayRes zap.String("save_path", in.GetSavePath())) if in.GetWorkflowType().GetName() == "" { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.FailedPrecondition), Message: "workflow_name should not be empty", } @@ -306,25 +314,31 @@ func (r *rpc) ReplayFromJSON(in *protoApi.ReplayRequest, out *protoApi.ReplayRes case 0: err := replayer.ReplayWorkflowHistoryFromJSONFile(logger.NewZapAdapter(r.plugin.log), in.GetSavePath()) if err != nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.Internal), Message: err.Error(), } r.plugin.log.Error("replay from JSON request", zap.Error(err)) + return nil } default: // we have last event ID err := replayer.ReplayPartialWorkflowHistoryFromJSONFile(logger.NewZapAdapter(r.plugin.log), in.GetSavePath(), in.GetLastEventId()) if err != nil { - *out.Status = common.Status{ + out.Status = &common.Status{ Code: int32(codes.Internal), Message: err.Error(), } r.plugin.log.Error("replay from JSON request (partial workflow history)", zap.Error(err)) + return nil } } + out.Status = &common.Status{ + Code: int32(codes.OK), + } + return nil }