Skip to content

Commit

Permalink
Work on inverted workers
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Aug 4, 2023
1 parent 1ec43ad commit 3b51434
Show file tree
Hide file tree
Showing 7 changed files with 947 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/stretchr/testify v1.8.3
go.temporal.io/api v1.21.0
go.uber.org/atomic v1.9.0
golang.org/x/sys v0.8.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.55.0
google.golang.org/protobuf v1.30.0
Expand Down
6 changes: 3 additions & 3 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func WithActivityTask(
scheduleToCloseTimeout := common.DurationValue(task.GetScheduleToCloseTimeout())
startToCloseTimeout := common.DurationValue(task.GetStartToCloseTimeout())
heartbeatTimeout := common.DurationValue(task.GetHeartbeatTimeout())
deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout)
deadline := CalculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout)

logger = log.With(logger,
tagActivityID, task.ActivityId,
Expand Down Expand Up @@ -331,7 +331,7 @@ func WithLocalActivityTask(
if scheduleToCloseTimeout == 0 {
scheduleToCloseTimeout = startToCloseTimeout
}
deadline := calculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout)
deadline := CalculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout)
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
Expand Down Expand Up @@ -382,7 +382,7 @@ func newActivityContext(
return ctx, nil
}

func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time {
func CalculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time {
startToCloseDeadline := started.Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
Expand Down
26 changes: 18 additions & 8 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ type (
// worker options, the ones here wrap the ones in worker options. The same
// interceptor should not be set here and in worker options.
Interceptors []ClientInterceptor

overrideWorkflowService workflowservice.WorkflowServiceClient
}

// HeadersProvider returns a map of gRPC headers that should be used on every request.
Expand Down Expand Up @@ -669,6 +671,10 @@ type (
}
)

func OverrideWorkflowService(c *ClientOptions, workflowService workflowservice.WorkflowServiceClient) {
c.overrideWorkflowService = workflowService
}

// DialClient creates a client and attempts to connect to the server.
func DialClient(options ClientOptions) (Client, error) {
options.ConnectionOptions.disableEagerConnection = false
Expand Down Expand Up @@ -719,19 +725,23 @@ func newClient(options ClientOptions, existing *WorkflowClient) (Client, error)
}

// Dial or use existing connection
serviceClient := options.overrideWorkflowService
var connection *grpc.ClientConn
var err error
if existing == nil {
options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
connection, err = dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
if serviceClient == nil {
if existing == nil {
options.ConnectionOptions.excludeInternalFromRetry = uberatomic.NewBool(false)
connection, err = dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
}
} else {
connection = existing.conn
}
} else {
connection = existing.conn
serviceClient = workflowservice.NewWorkflowServiceClient(connection)
}

client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options)
client := NewServiceClient(serviceClient, connection, options)

// If using existing connection, always load its capabilities and use them for
// the new connection. Otherwise, only load server capabilities eagerly if not
Expand Down
14 changes: 14 additions & 0 deletions worker/invertedworker/durable_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package invertedworker

import (
"context"

"go.temporal.io/api/common/v1"
"go.temporal.io/api/history/v1"
)

type DurableStore interface {
AppendPartialHistory(context.Context, *common.WorkflowExecution, *history.History) error
GetHistory(context.Context, *common.WorkflowExecution) (*history.History, error)
PurgeHistory(context.Context, *common.WorkflowExecution) error
}
Loading

0 comments on commit 3b51434

Please sign in to comment.