Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tdeebswihart committed Oct 9, 2023
1 parent 82893a2 commit f55b915
Show file tree
Hide file tree
Showing 30 changed files with 436 additions and 201 deletions.
17 changes: 16 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,5 +705,20 @@ type HistoryJSONOptions struct {
// HistoryFromJSON deserializes history from a reader of JSON bytes. This does
// not close the reader if it is closeable.
func HistoryFromJSON(r io.Reader, options HistoryJSONOptions) (*historypb.History, error) {
return historypb.LoadFromJSON(r, options.LastEventID)
hist, err := historypb.LoadFromJSON(r)
if err != nil {
return nil, err
}

// If there is a last event ID, slice the rest off
if options.LastEventID > 0 {
for i, event := range hist.Events {
if event.EventId == options.LastEventID {
// Inclusive
hist.Events = hist.Events[:i+1]
break
}
}
}
return hist, nil
}
2 changes: 1 addition & 1 deletion contrib/datadog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ require (
)

replace (
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231003190133-d410a2a8e043
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231009210256-ec12a7f8f043
go.temporal.io/sdk => ../../
)
49 changes: 42 additions & 7 deletions contrib/datadog/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/opentelemetry/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ require (
)

replace (
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231003190133-d410a2a8e043
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231009210256-ec12a7f8f043
go.temporal.io/sdk => ../../
)
49 changes: 42 additions & 7 deletions contrib/opentelemetry/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/opentracing/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ require (
)

replace (
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231003190133-d410a2a8e043
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231009210256-ec12a7f8f043
go.temporal.io/sdk => ../../
)
49 changes: 42 additions & 7 deletions contrib/opentracing/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/tally/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ require (
)

replace (
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231003190133-d410a2a8e043
go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231009210256-ec12a7f8f043
go.temporal.io/sdk => ../../
)
49 changes: 42 additions & 7 deletions contrib/tally/go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/tools/workflowcheck/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ require (
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
)

replace go.temporal.io/api => github.com/tdeebswihart/temporal-api v0.0.0-20231003190030-5b228be4548b
replace go.temporal.io/api => github.com/tdeebswihart/temporal-api v0.0.0-20231009181144-860b4c805880
13 changes: 6 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ module go.temporal.io/sdk

go 1.20

replace go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231003190133-d410a2a8e043
replace go.temporal.io/api => github.com/tdeebswihart/temporal-api-go v0.0.0-20231009210256-ec12a7f8f043

require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/google/go-cmp v0.5.9
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/pborman/uuid v1.2.1
github.com/robfig/cron v1.2.0
github.com/stretchr/testify v1.8.4
go.temporal.io/api v1.24.0
go.uber.org/atomic v1.9.0
golang.org/x/sys v0.12.0
golang.org/x/sys v0.13.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.58.2
google.golang.org/protobuf v1.31.0
Expand All @@ -29,10 +28,10 @@ require (
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/net v0.16.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
49 changes: 42 additions & 7 deletions go.sum

Large diffs are not rendered by default.

12 changes: 5 additions & 7 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"time"

commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/types/duration"
"go.temporal.io/api/types/timestamp"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
Expand Down Expand Up @@ -256,11 +254,11 @@ func WithActivityTask(
contextPropagators []ContextPropagator,
interceptors []WorkerInterceptor,
) (context.Context, error) {
scheduled := timestamp.Value(task.GetScheduledTime())
started := timestamp.Value(task.GetStartedTime())
scheduleToCloseTimeout := duration.Value(task.GetScheduleToCloseTimeout())
startToCloseTimeout := duration.Value(task.GetStartToCloseTimeout())
heartbeatTimeout := duration.Value(task.GetHeartbeatTimeout())
scheduled := task.GetScheduledTime().AsTime()
started := task.GetStartedTime().AsTime()
scheduleToCloseTimeout := task.GetScheduleToCloseTimeout().AsDuration()
startToCloseTimeout := task.GetStartToCloseTimeout().AsDuration()
heartbeatTimeout := task.GetHeartbeatTimeout().AsDuration()
deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout)

logger = log.With(logger,
Expand Down
2 changes: 1 addition & 1 deletion internal/common/serializer/jsonpb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewJSONPBIndentEncoder(indent string) JSONPBEncoder {

// Encode protobuf struct to bytes.
func (e JSONPBEncoder) Encode(pb proto.Message) ([]byte, error) {
return protojson.Marshal(pb)
return e.opts.Marshal(pb)
}

// Decode bytes to protobuf struct.
Expand Down
4 changes: 3 additions & 1 deletion internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"reflect"
"time"

"google.golang.org/protobuf/proto"

commonpb "go.temporal.io/api/common/v1"

"go.temporal.io/sdk/converter"
Expand Down Expand Up @@ -311,7 +313,7 @@ func setActivityParametersIfNotExist(ctx Context) Context {
if params != nil {
newParams = *params
if params.RetryPolicy != nil {
newParams.RetryPolicy = copyPBRetryPolicy(params.RetryPolicy)
newParams.RetryPolicy = proto.Clone(params.RetryPolicy).(*commonpb.RetryPolicy)
}
}
return WithValue(ctx, activityOptionsContextKey, &newParams)
Expand Down
11 changes: 3 additions & 8 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/types/timestamp"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -1074,7 +1073,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
// No Operation
case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
// Set replay clock.
weh.SetCurrentReplayTime(timestamp.Value(event.GetEventTime()))
weh.SetCurrentReplayTime(event.GetEventTime().AsTime())
// Update workflow info fields
weh.workflowInfo.currentHistoryLength = int(event.EventId)
weh.workflowInfo.continueAsNewSuggested = event.GetWorkflowTaskStartedEventAttributes().GetSuggestContinueAsNew()
Expand Down Expand Up @@ -1257,16 +1256,12 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessQuery(
return nil, err
}

bs, err := proto.Marshal(result)
if err != nil {
return nil, fmt.Errorf("failed to check size of query result: %w", err)
}
if len(bs) > queryResultSizeLimit {
if result.Size() > queryResultSizeLimit {
weh.logger.Error("Query result size exceeds limit.",
tagQueryType, queryType,
tagWorkflowID, weh.workflowInfo.WorkflowExecution.ID,
tagRunID, weh.workflowInfo.WorkflowExecution.RunID)
return nil, fmt.Errorf("query result size (%v) exceeds limit (%v)", len(bs), queryResultSizeLimit)
return nil, fmt.Errorf("query result size (%v) exceeds limit (%v)", result.Size(), queryResultSizeLimit)
}

return result, nil
Expand Down
44 changes: 21 additions & 23 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ import (
schedulepb "go.temporal.io/api/schedule/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/types/duration"
"go.temporal.io/api/types/timestamp"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
Expand Down Expand Up @@ -395,23 +393,23 @@ func convertToPBScheduleSpec(scheduleSpec *ScheduleSpec) *schedulepb.ScheduleSpe

skip := convertToPBScheduleCalendarSpecList(scheduleSpec.Skip)

var startTime time.Time
var startTime *timestamppb.Timestamp
if !scheduleSpec.StartAt.IsZero() {
startTime = scheduleSpec.StartAt
startTime = timestamppb.New(scheduleSpec.StartAt)
}

var endTime time.Time
var endTime *timestamppb.Timestamp
if !scheduleSpec.EndAt.IsZero() {
endTime = scheduleSpec.EndAt
endTime = timestamppb.New(scheduleSpec.EndAt)
}

return &schedulepb.ScheduleSpec{
StructuredCalendar: calendar,
Interval: intervals,
CronString: scheduleSpec.CronExpressions,
ExcludeStructuredCalendar: skip,
StartTime: timestamppb.New(startTime),
EndTime: timestamppb.New(endTime),
StartTime: startTime,
EndTime: endTime,
Jitter: durationpb.New(scheduleSpec.Jitter),
// TODO support custom time zone data
TimezoneName: scheduleSpec.TimeZoneName,
Expand All @@ -428,21 +426,21 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS
intervals := make([]ScheduleIntervalSpec, len(scheduleSpec.GetInterval()))
for i, s := range scheduleSpec.GetInterval() {
intervals[i] = ScheduleIntervalSpec{
Every: duration.Value(s.Interval),
Offset: duration.Value(s.Phase),
Every: s.Interval.AsDuration(),
Offset: s.Phase.AsDuration(),
}
}

skip := convertFromPBScheduleCalendarSpecList(scheduleSpec.GetExcludeStructuredCalendar())

startAt := time.Time{}
if scheduleSpec.GetStartTime() != nil {
startAt = timestamp.Value(scheduleSpec.GetStartTime())
startAt = scheduleSpec.GetStartTime().AsTime()
}

endAt := time.Time{}
if scheduleSpec.GetEndTime() != nil {
endAt = timestamp.Value(scheduleSpec.GetEndTime())
endAt = scheduleSpec.GetEndTime().AsTime()
}

return &ScheduleSpec{
Expand All @@ -451,7 +449,7 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS
Skip: skip,
StartAt: startAt,
EndAt: endAt,
Jitter: duration.Value(scheduleSpec.GetJitter()),
Jitter: scheduleSpec.GetJitter().AsDuration(),
TimeZoneName: scheduleSpec.GetTimezoneName(),
}
}
Expand All @@ -473,7 +471,7 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul

nextActionTimes := make([]time.Time, len(describeResponse.Info.GetFutureActionTimes()))
for i, t := range describeResponse.Info.GetFutureActionTimes() {
nextActionTimes[i] = timestamp.Value(t)
nextActionTimes[i] = t.AsTime()
}

actionDescription, err := convertFromPBScheduleAction(describeResponse.Schedule.Action)
Expand All @@ -487,7 +485,7 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul
Spec: convertFromPBScheduleSpec(describeResponse.Schedule.Spec),
Policy: &SchedulePolicies{
Overlap: describeResponse.Schedule.Policies.GetOverlapPolicy(),
CatchupWindow: duration.Value(describeResponse.Schedule.Policies.GetCatchupWindow()),
CatchupWindow: describeResponse.Schedule.Policies.GetCatchupWindow().AsDuration(),
PauseOnFailure: describeResponse.Schedule.Policies.GetPauseOnFailure(),
},
State: &ScheduleState{
Expand All @@ -504,8 +502,8 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul
RunningWorkflows: runningWorkflows,
RecentActions: recentActions,
NextActionTimes: nextActionTimes,
CreatedAt: timestamp.Value(describeResponse.Info.GetCreateTime()),
LastUpdateAt: timestamp.Value(describeResponse.Info.GetUpdateTime()),
CreatedAt: describeResponse.Info.GetCreateTime().AsTime(),
LastUpdateAt: describeResponse.Info.GetUpdateTime().AsTime(),
},
Memo: describeResponse.Memo,
SearchAttributes: describeResponse.SearchAttributes,
Expand Down Expand Up @@ -544,7 +542,7 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch

nextActionTimes := make([]time.Time, len(schedule.Info.GetFutureActionTimes()))
for i, t := range schedule.Info.GetFutureActionTimes() {
nextActionTimes[i] = timestamp.Value(t)
nextActionTimes[i] = t.AsTime()
}

return &ScheduleListEntry{
Expand Down Expand Up @@ -651,9 +649,9 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
Workflow: workflow.WorkflowType.GetName(),
Args: args,
TaskQueue: workflow.TaskQueue.GetName(),
WorkflowExecutionTimeout: duration.Value(workflow.GetWorkflowExecutionTimeout()),
WorkflowRunTimeout: duration.Value(workflow.GetWorkflowRunTimeout()),
WorkflowTaskTimeout: duration.Value(workflow.GetWorkflowTaskTimeout()),
WorkflowExecutionTimeout: workflow.GetWorkflowExecutionTimeout().AsDuration(),
WorkflowRunTimeout: workflow.GetWorkflowRunTimeout().AsDuration(),
WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(),
RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy),
Memo: memos,
SearchAttributes: searchAttributes,
Expand Down Expand Up @@ -777,8 +775,8 @@ func convertFromPBScheduleActionResultList(aa []*schedulepb.ScheduleActionResult
}
}
recentActions[i] = ScheduleActionResult{
ScheduleTime: timestamp.Value(a.GetScheduleTime()),
ActualTime: timestamp.Value(a.GetActualTime()),
ScheduleTime: a.GetScheduleTime().AsTime(),
ActualTime: a.GetActualTime().AsTime(),
StartWorkflowResult: workflowExecution,
}
}
Expand Down
12 changes: 5 additions & 7 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ import (
"go.temporal.io/api/sdk/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/types/duration"
"go.temporal.io/api/types/timestamp"
"go.temporal.io/api/workflowservice/v1"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
Expand Down Expand Up @@ -676,12 +674,12 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
FirstRunID: attributes.FirstExecutionRunId,
WorkflowType: WorkflowType{Name: task.WorkflowType.GetName()},
TaskQueueName: taskQueue.GetName(),
WorkflowExecutionTimeout: duration.Value(attributes.GetWorkflowExecutionTimeout()),
WorkflowRunTimeout: duration.Value(attributes.GetWorkflowRunTimeout()),
WorkflowTaskTimeout: duration.Value(attributes.GetWorkflowTaskTimeout()),
WorkflowExecutionTimeout: attributes.GetWorkflowExecutionTimeout().AsDuration(),
WorkflowRunTimeout: attributes.GetWorkflowRunTimeout().AsDuration(),
WorkflowTaskTimeout: attributes.GetWorkflowTaskTimeout().AsDuration(),
Namespace: wth.namespace,
Attempt: attributes.GetAttempt(),
WorkflowStartTime: timestamp.Value(startedEvent.GetEventTime()),
WorkflowStartTime: startedEvent.GetEventTime().AsTime(),
lastCompletionResult: attributes.LastCompletionResult,
lastFailure: attributes.ContinuedFailure,
CronSchedule: attributes.CronSchedule,
Expand Down Expand Up @@ -2023,7 +2021,7 @@ func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice
canCtx, cancel := context.WithCancel(rootCtx)
defer cancel()

heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(duration.Value(t.GetHeartbeatTimeout()))
heartbeatThrottleInterval := ath.getHeartbeatThrottleInterval(t.GetHeartbeatTimeout().AsDuration())
invoker := newServiceInvoker(
t.TaskToken, ath.identity, ath.service, ath.metricsHandler, cancel, heartbeatThrottleInterval,
ath.workerStopCh, ath.namespace)
Expand Down
Loading

0 comments on commit f55b915

Please sign in to comment.