Skip to content

Commit

Permalink
chore: NPE (incorrent payload assigment)
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <[email protected]>
  • Loading branch information
rustatian committed Aug 2, 2023
1 parent 445d146 commit af27c47
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 21 deletions.
11 changes: 10 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
{
"cSpell.words": ["replayer"]
"cSpell.words": [
"commonpb",
"errn",
"jsonpb",
"nolintlint",
"protobuf",
"replayer",
"stylecheck",
"temporalio"
]
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.20
require (
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c
github.com/goccy/go-json v0.10.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.0
github.com/roadrunner-server/api/v4 v4.6.1
github.com/roadrunner-server/endure/v2 v2.3.1
Expand Down Expand Up @@ -35,6 +34,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.1 // indirect
github.com/golang/mock v1.7.0-rc.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/pborman/uuid v1.2.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,6 @@ go.temporal.io/api v1.23.1-0.20230721162852-f326b9afc8e1 h1:IMMihad9mxEsZHEnw6uH
go.temporal.io/api v1.23.1-0.20230721162852-f326b9afc8e1/go.mod h1:Ezq+jBRcsvaZTbJaYzFhsl7r3c24YSI35lm0Af5rJG4=
go.temporal.io/sdk/contrib/tally v0.2.0 h1:XnTJIQcjOv+WuCJ1u8Ve2nq+s2H4i/fys34MnWDRrOo=
go.temporal.io/sdk/contrib/tally v0.2.0/go.mod h1:1kpSuCms/tHeJQDPuuKkaBsMqfHnIIRnCtUYlPNXxuE=
go.temporal.io/server v1.21.3 h1:vtGGCJJ6pG31JC64mRcjkdNYUxIdapMaPWPY1qMEuVA=
go.temporal.io/server v1.21.3/go.mod h1:o3azXry/6fuvSuXSpPmeCmkUmZ0RuFYgULJu67qwRZY=
go.temporal.io/server v1.21.4 h1:lW32GaqHEWWveeEYKWFzqpfQTws5OXSVUvB3hEofCYc=
go.temporal.io/server v1.21.4/go.mod h1:o3azXry/6fuvSuXSpPmeCmkUmZ0RuFYgULJu67qwRZY=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
48 changes: 31 additions & 17 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
"time"

"github.com/gogo/protobuf/jsonpb"
v1Proto "github.com/golang/protobuf/proto" //nolint:staticcheck,nolintlint
"github.com/roadrunner-server/api/v4/build/common/v1"
commonpb "github.com/roadrunner-server/api/v4/build/temporal/api/common/v1"
protoApi "github.com/roadrunner-server/api/v4/build/temporal/v1"
"github.com/temporalio/roadrunner-temporal/v4/internal/logger"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/history/v1"
"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -56,7 +55,7 @@ func (r *rpc) RecordActivityHeartbeat(in RecordHeartbeatRequest, out *RecordHear
details := &commonpb.Payloads{}

if len(in.Details) != 0 {
if err := proto.Unmarshal(in.Details, v1Proto.MessageV2(details)); err != nil {
if err := proto.Unmarshal(in.Details, details); err != nil {
return err
}
}
Expand Down Expand Up @@ -109,7 +108,7 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes
zap.String("workflow_name", in.GetWorkflowType().GetName()))

if in.GetWorkflowExecution() == nil || in.GetWorkflowType() == nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.FailedPrecondition),
Message: "run_id, workflow_id or workflow_name should not be empty",
}
Expand All @@ -124,21 +123,23 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes
zap.String("workflow_name", in.GetWorkflowType().GetName()))

if in.GetWorkflowExecution().GetRunId() == "" || in.GetWorkflowExecution().GetWorkflowId() == "" || in.GetWorkflowType().GetName() == "" {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.FailedPrecondition),
Message: "run_id, workflow_id or workflow_name should not be empty",
}

r.plugin.log.Error("replay workflow request", zap.String("error", "run_id, workflow_id or workflow_name should not be empty"))
return nil
}

if r.plugin.rrWorkflowDef == nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.FailedPrecondition),
Message: "workflow definition is not initialized",
}

r.plugin.log.Error("replay workflow request", zap.String("error", "workflow definition is not initialized"))
return nil
}

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
Expand All @@ -149,7 +150,7 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes
for iter.HasNext() {
event, err := iter.Next()
if err != nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}
Expand All @@ -168,7 +169,7 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes

err := replayer.ReplayWorkflowHistory(logger.NewZapAdapter(r.plugin.log), &hist)
if err != nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}
Expand All @@ -177,6 +178,10 @@ func (r *rpc) ReplayWorkflow(in *protoApi.ReplayRequest, out *protoApi.ReplayRes
return nil
}

out.Status = &common.Status{
Code: int32(codes.OK),
}

return nil
}

Expand All @@ -187,7 +192,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi.
zap.String("save_path", in.GetSavePath()))

if in.GetWorkflowExecution() == nil || in.GetWorkflowType() == nil || in.GetSavePath() == "" {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.FailedPrecondition),
Message: "run_id, workflow_id or save_path should not be empty",
}
Expand All @@ -201,7 +206,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi.
zap.String("save_path", in.GetSavePath()))

if in.GetWorkflowExecution().GetRunId() == "" || in.GetWorkflowExecution().GetWorkflowId() == "" || in.GetWorkflowType().GetName() == "" {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.FailedPrecondition),
Message: "run_id, workflow_id or save_path should not be empty",
}
Expand All @@ -212,7 +217,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi.

file, err := os.Create(in.GetSavePath())
if err != nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}
Expand All @@ -237,7 +242,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi.
for iter.HasNext() {
event, errn := iter.Next()
if errn != nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.Internal),
Message: errn.Error(),
}
Expand All @@ -252,7 +257,7 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi.
marshaler := jsonpb.Marshaler{}
err = marshaler.Marshal(file, &hist)
if err != nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}
Expand All @@ -261,6 +266,9 @@ func (r *rpc) DownloadWorkflowHistory(in *protoApi.ReplayRequest, out *protoApi.
return nil
}

out.Status = &common.Status{
Code: int32(codes.OK),
}
r.plugin.log.Debug("history saved", zap.String("location", in.GetSavePath()))

return nil
Expand All @@ -272,7 +280,7 @@ func (r *rpc) ReplayFromJSON(in *protoApi.ReplayRequest, out *protoApi.ReplayRes
zap.String("save_path", in.GetSavePath()))

if in.GetWorkflowType() == nil || in.GetSavePath() == "" {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.FailedPrecondition),
Message: "workflow_name and save_path should not be empty",
}
Expand All @@ -286,7 +294,7 @@ func (r *rpc) ReplayFromJSON(in *protoApi.ReplayRequest, out *protoApi.ReplayRes
zap.String("save_path", in.GetSavePath()))

if in.GetWorkflowType().GetName() == "" {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.FailedPrecondition),
Message: "workflow_name should not be empty",
}
Expand All @@ -306,25 +314,31 @@ func (r *rpc) ReplayFromJSON(in *protoApi.ReplayRequest, out *protoApi.ReplayRes
case 0:
err := replayer.ReplayWorkflowHistoryFromJSONFile(logger.NewZapAdapter(r.plugin.log), in.GetSavePath())
if err != nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}

r.plugin.log.Error("replay from JSON request", zap.Error(err))
return nil
}
default:
// we have last event ID
err := replayer.ReplayPartialWorkflowHistoryFromJSONFile(logger.NewZapAdapter(r.plugin.log), in.GetSavePath(), in.GetLastEventId())
if err != nil {
*out.Status = common.Status{
out.Status = &common.Status{
Code: int32(codes.Internal),
Message: err.Error(),
}

r.plugin.log.Error("replay from JSON request (partial workflow history)", zap.Error(err))
return nil
}
}

out.Status = &common.Status{
Code: int32(codes.OK),
}

return nil
}

0 comments on commit af27c47

Please sign in to comment.