diff --git a/common/persistence/cassandra/cassandraVisibilityPersistence.go b/common/persistence/cassandra/cassandraVisibilityPersistence.go index 481e9b0d12d..b94966866bb 100644 --- a/common/persistence/cassandra/cassandraVisibilityPersistence.go +++ b/common/persistence/cassandra/cassandraVisibilityPersistence.go @@ -38,19 +38,13 @@ import ( // Fixed namespace values for now const ( - namespacePartition = 0 - defaultCloseTTLSeconds = 86400 - openExecutionTTLBuffer = int64(86400) // setting it to a day to account for shard going down + namespacePartition = 0 // ref: https://docs.datastax.com/en/dse-trblshoot/doc/troubleshooting/recoveringTtlYear2038Problem.html maxCassandraTTL = int64(315360000) // Cassandra max support time is 2038-01-19T03:14:06+00:00. Updated this to 10 years to support until year 2028 ) const ( - templateCreateWorkflowExecutionStartedWithTTL = `INSERT INTO open_executions (` + - `namespace_id, namespace_partition, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_queue) ` + - `VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?` - templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` + `namespace_id, namespace_partition, workflow_id, run_id, start_time, execution_time, workflow_type_name, memo, encoding, task_queue) ` + `VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` @@ -164,37 +158,22 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(request func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStartedV2( request *p.InternalRecordWorkflowExecutionStartedRequest) error { - ttl := request.RunTimeout + openExecutionTTLBuffer - var query gocql.Query - if ttl > maxCassandraTTL { - query = v.session.Query(templateCreateWorkflowExecutionStarted, - request.NamespaceID, - namespacePartition, - request.WorkflowID, - request.RunID, - p.UnixNanoToDBTimestamp(request.StartTimestamp), - p.UnixNanoToDBTimestamp(request.ExecutionTimestamp), - request.WorkflowTypeName, - request.Memo.Data, - request.Memo.EncodingType.String(), - request.TaskQueue, - ) - } else { - query = v.session.Query(templateCreateWorkflowExecutionStartedWithTTL, - request.NamespaceID, - namespacePartition, - request.WorkflowID, - request.RunID, - p.UnixNanoToDBTimestamp(request.StartTimestamp), - p.UnixNanoToDBTimestamp(request.ExecutionTimestamp), - request.WorkflowTypeName, - request.Memo.Data, - request.Memo.EncodingType.String(), - request.TaskQueue, - ttl, - ) - } + query := v.session.Query(templateCreateWorkflowExecutionStarted, + request.NamespaceID, + namespacePartition, + request.WorkflowID, + request.RunID, + p.UnixNanoToDBTimestamp(request.StartTimestamp), + p.UnixNanoToDBTimestamp(request.ExecutionTimestamp), + request.WorkflowTypeName, + request.Memo.Data, + request.Memo.EncodingType.String(), + request.TaskQueue, + ) + // It is important to specify timestamp for all `open_executions` queries because + // we are using milliseconds instead of default microseconds. If custom timestamp collides with + // default timestamp, default one will always win because they are 1000 times bigger. query = query.WithTimestamp(p.UnixNanoToDBTimestamp(request.StartTimestamp)) err := query.Exec() return gocql.ConvertError("RecordWorkflowExecutionStarted", err) @@ -220,12 +199,14 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosedV2( // Next, add a row in the closed table. // Find how long to keep the row - retention := request.RetentionSeconds - if retention == 0 { - retention = defaultCloseTTLSeconds + var retentionSeconds int64 + if request.Retention != nil { + retentionSeconds = int64(request.Retention.Seconds()) + } else { + retentionSeconds = maxCassandraTTL + 1 } - if retention > maxCassandraTTL { + if retentionSeconds > maxCassandraTTL { batch.Query(templateCreateWorkflowExecutionClosed, request.NamespaceID, namespacePartition, @@ -256,21 +237,25 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosedV2( request.Memo.Data, request.Memo.EncodingType.String(), request.TaskQueue, - retention, + retentionSeconds, ) } - // RecordWorkflowExecutionStarted is using StartTimestamp as - // the timestamp to issue query to Cassandra - // due to the fact that cross DC using mutable state creation time as workflow start time - // and visibility using event time instead of last update time (#1501) - // CloseTimestamp can be before StartTimestamp, meaning using CloseTimestamp - // can cause the deletion of open visibility record to be ignored. - queryTimeStamp := request.CloseTimestamp - if queryTimeStamp < request.StartTimestamp { - queryTimeStamp = request.StartTimestamp + time.Second.Nanoseconds() + // RecordWorkflowExecutionStarted is using StartTimestamp as the timestamp for every query in `open_executions` table. + // Due to the fact that cross DC using mutable state creation time as workflow start time and visibility using event time + // instead of last update time (https://github.com/uber/cadence/pull/1501) CloseTimestamp can be before StartTimestamp (or very close it). + // In this case, use (StartTimestamp + minWorkflowDuration) for delete operation to guarantee that it is greater than StartTimestamp + // and won't be ignored. + + const minWorkflowDuration = time.Second + var batchTimestamp int64 + if request.CloseTimestamp-request.StartTimestamp < minWorkflowDuration.Nanoseconds() { + batchTimestamp = request.StartTimestamp + minWorkflowDuration.Nanoseconds() + } else { + batchTimestamp = request.CloseTimestamp } - batch = batch.WithTimestamp(p.UnixNanoToDBTimestamp(queryTimeStamp)) + + batch = batch.WithTimestamp(p.UnixNanoToDBTimestamp(batchTimestamp)) err := v.session.ExecuteBatch(batch) return gocql.ConvertError("RecordWorkflowExecutionClosed", err) } diff --git a/common/persistence/persistence-tests/visibilityPersistenceTest.go b/common/persistence/persistence-tests/visibilityPersistenceTest.go index 1b7974d42d4..2ac9b79f51a 100644 --- a/common/persistence/persistence-tests/visibilityPersistenceTest.go +++ b/common/persistence/persistence-tests/visibilityPersistenceTest.go @@ -33,6 +33,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" workflowpb "go.temporal.io/api/workflow/v1" + "go.temporal.io/server/common/persistence/cassandra" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/payload" @@ -138,7 +139,7 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() { RunId: "fb15e4b5-356f-466d-8c6d-a29223e5c536", } - startTime := time.Now().UTC().Add(time.Second * -5).UnixNano() + startTime := time.Now().UTC().UnixNano() err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{ VisibilityRequestBase: &p.VisibilityRequestBase{ NamespaceID: testNamespaceUUID, @@ -147,7 +148,7 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() { StartTimestamp: startTime, }, }) - s.Nil(err0) + s.NoError(err0) resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ NamespaceID: testNamespaceUUID, @@ -155,7 +156,7 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() { EarliestStartTime: startTime, LatestStartTime: startTime, }) - s.Nil(err1) + s.NoError(err1) s.Equal(1, len(resp.Executions)) s.Equal(workflowExecution.WorkflowId, resp.Executions[0].Execution.WorkflowId) @@ -166,9 +167,9 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() { WorkflowTypeName: "visibility-workflow", StartTimestamp: startTime, }, - CloseTimestamp: startTime - (10 * time.Second).Nanoseconds(), + CloseTimestamp: startTime - (10 * time.Millisecond).Nanoseconds(), }) - s.Nil(err2) + s.NoError(err2) resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ NamespaceID: testNamespaceUUID, @@ -176,19 +177,134 @@ func (s *VisibilityPersistenceSuite) TestBasicVisibilityTimeSkew() { EarliestStartTime: startTime, LatestStartTime: startTime, }) - s.Nil(err3) + s.NoError(err3) s.Equal(0, len(resp.Executions)) resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ NamespaceID: testNamespaceUUID, PageSize: 1, - EarliestStartTime: startTime - (15 * time.Second).Nanoseconds(), - LatestStartTime: time.Now().UnixNano(), + EarliestStartTime: startTime - (10 * time.Millisecond).Nanoseconds(), // This is actually close_time + LatestStartTime: startTime - (10 * time.Millisecond).Nanoseconds(), }) - s.Nil(err4) + s.NoError(err4) + s.Equal(1, len(resp.Executions)) +} + +func (s *VisibilityPersistenceSuite) TestBasicVisibilityShortWorkflow() { + testNamespaceUUID := uuid.New() + + workflowExecution := commonpb.WorkflowExecution{ + WorkflowId: "visibility-workflow-test-short-workflow", + RunId: "3c095198-0c33-4136-939a-c29fbbb6a80b", + } + + startTime := time.Now().UTC().UnixNano() + err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{ + VisibilityRequestBase: &p.VisibilityRequestBase{ + NamespaceID: testNamespaceUUID, + Execution: workflowExecution, + WorkflowTypeName: "visibility-workflow", + StartTimestamp: startTime, + }, + }) + s.NoError(err0) + + err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{ + VisibilityRequestBase: &p.VisibilityRequestBase{ + NamespaceID: testNamespaceUUID, + Execution: workflowExecution, + WorkflowTypeName: "visibility-workflow", + StartTimestamp: startTime, + }, + CloseTimestamp: startTime + (10 * time.Millisecond).Nanoseconds(), + }) + s.NoError(err2) + + resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ + NamespaceID: testNamespaceUUID, + PageSize: 1, + EarliestStartTime: startTime, + LatestStartTime: startTime, + }) + s.NoError(err3) + s.Equal(0, len(resp.Executions)) + + resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ + NamespaceID: testNamespaceUUID, + PageSize: 1, + EarliestStartTime: startTime + (10 * time.Millisecond).Nanoseconds(), // This is actually close_time + LatestStartTime: startTime + (10 * time.Millisecond).Nanoseconds(), + }) + s.NoError(err4) s.Equal(1, len(resp.Executions)) } +func (s *VisibilityPersistenceSuite) TestVisibilityRetention() { + if _, ok := s.VisibilityTestCluster.(*cassandra.TestCluster); !ok { + return + } + + testNamespaceUUID := uuid.New() + + workflowExecution := commonpb.WorkflowExecution{ + WorkflowId: "visibility-workflow-test-visibility-retention", + RunId: "3c095198-0c33-4136-939a-c29fbbb6a802", + } + + startTime := time.Now().UTC().Add(-1 * time.Hour).UnixNano() + err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&p.RecordWorkflowExecutionStartedRequest{ + VisibilityRequestBase: &p.VisibilityRequestBase{ + NamespaceID: testNamespaceUUID, + Execution: workflowExecution, + WorkflowTypeName: "visibility-workflow", + StartTimestamp: startTime, + }, + }) + s.NoError(err0) + + retention := 1 * time.Second + err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{ + VisibilityRequestBase: &p.VisibilityRequestBase{ + NamespaceID: testNamespaceUUID, + Execution: workflowExecution, + WorkflowTypeName: "visibility-workflow", + StartTimestamp: startTime, + }, + CloseTimestamp: startTime + (1 * time.Minute).Nanoseconds(), + Retention: &retention, + }) + s.NoError(err2) + + resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ + NamespaceID: testNamespaceUUID, + PageSize: 1, + EarliestStartTime: startTime, + LatestStartTime: startTime, + }) + s.NoError(err3) + s.Equal(0, len(resp.Executions)) + + resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ + NamespaceID: testNamespaceUUID, + PageSize: 1, + EarliestStartTime: startTime + (1 * time.Minute).Nanoseconds(), // This is actually close_time + LatestStartTime: startTime + (1 * time.Minute).Nanoseconds(), + }) + s.NoError(err4) + s.Equal(1, len(resp.Executions)) + + // Sleep for retention to fire. + time.Sleep(retention) + resp2, err5 := s.VisibilityMgr.ListClosedWorkflowExecutions(&p.ListWorkflowExecutionsRequest{ + NamespaceID: testNamespaceUUID, + PageSize: 1, + EarliestStartTime: startTime + (1 * time.Minute).Nanoseconds(), // This is actually close_time + LatestStartTime: startTime + (1 * time.Minute).Nanoseconds(), + }) + s.NoError(err5) + s.Equal(0, len(resp2.Executions)) +} + // TestVisibilityPagination test func (s *VisibilityPersistenceSuite) TestVisibilityPagination() { testNamespaceUUID := uuid.New() @@ -746,7 +862,6 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() { }, Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, - WorkflowTimeout: 0, }, expected: nil, }, @@ -764,7 +879,6 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() { SearchAttributes: nil, Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, - WorkflowTimeout: 0, }, // To avoid blocking the task queue processors on non-ElasticSearch visibility stores // we simply treat any attempts to perform Upserts as "no-ops" diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index 1f32b1eac3f..0c5ec175b65 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -481,22 +481,19 @@ type ( // InternalRecordWorkflowExecutionStartedRequest request to RecordWorkflowExecutionStarted InternalRecordWorkflowExecutionStartedRequest struct { *InternalVisibilityRequestBase - RunTimeout int64 } // InternalRecordWorkflowExecutionClosedRequest is request to RecordWorkflowExecutionClosed InternalRecordWorkflowExecutionClosedRequest struct { *InternalVisibilityRequestBase - CloseTimestamp int64 - HistoryLength int64 - RetentionSeconds int64 + CloseTimestamp int64 + HistoryLength int64 + Retention *time.Duration } // InternalUpsertWorkflowExecutionRequest is request to UpsertWorkflowExecution InternalUpsertWorkflowExecutionRequest struct { *InternalVisibilityRequestBase - // TODO (alex): not used, remove - WorkflowTimeout int64 } // InternalCreateNamespaceRequest is used to create the namespace diff --git a/common/persistence/visibilityInterfaces.go b/common/persistence/visibilityInterfaces.go index 2e7c2dec6d3..03935ac471e 100644 --- a/common/persistence/visibilityInterfaces.go +++ b/common/persistence/visibilityInterfaces.go @@ -25,6 +25,8 @@ package persistence import ( + "time" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" @@ -55,21 +57,19 @@ type ( // RecordWorkflowExecutionStartedRequest is used to add a record of a newly started execution RecordWorkflowExecutionStartedRequest struct { *VisibilityRequestBase - RunTimeout int64 // not persisted, used for cassandra ttl } // RecordWorkflowExecutionClosedRequest is used to add a record of a closed execution RecordWorkflowExecutionClosedRequest struct { *VisibilityRequestBase - CloseTimestamp int64 - HistoryLength int64 - RetentionSeconds int64 + CloseTimestamp int64 + HistoryLength int64 + Retention *time.Duration // not persisted, used for cassandra ttl } // UpsertWorkflowExecutionRequest is used to upsert workflow execution UpsertWorkflowExecutionRequest struct { *VisibilityRequestBase - WorkflowTimeout int64 // not persisted, used for cassandra ttl } // ListWorkflowExecutionsRequest is used to list executions in a namespace diff --git a/common/persistence/visibilityStore.go b/common/persistence/visibilityStore.go index 2d4381debe7..22dcf82a1a8 100644 --- a/common/persistence/visibilityStore.go +++ b/common/persistence/visibilityStore.go @@ -68,7 +68,6 @@ func (v *visibilityManagerImpl) GetName() string { func (v *visibilityManagerImpl) RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error { req := &InternalRecordWorkflowExecutionStartedRequest{ InternalVisibilityRequestBase: v.newInternalVisibilityRequestBase(request.VisibilityRequestBase), - RunTimeout: request.RunTimeout, } return v.persistence.RecordWorkflowExecutionStarted(req) } @@ -76,7 +75,6 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionStarted(request *RecordWo func (v *visibilityManagerImpl) RecordWorkflowExecutionStartedV2(request *RecordWorkflowExecutionStartedRequest) error { req := &InternalRecordWorkflowExecutionStartedRequest{ InternalVisibilityRequestBase: v.newInternalVisibilityRequestBase(request.VisibilityRequestBase), - RunTimeout: request.RunTimeout, } return v.persistence.RecordWorkflowExecutionStartedV2(req) } @@ -87,7 +85,7 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionClosed(request *RecordWor InternalVisibilityRequestBase: v.newInternalVisibilityRequestBase(request.VisibilityRequestBase), CloseTimestamp: request.CloseTimestamp, HistoryLength: request.HistoryLength, - RetentionSeconds: request.RetentionSeconds, + Retention: request.Retention, } return v.persistence.RecordWorkflowExecutionClosed(req) } @@ -97,7 +95,6 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionClosedV2(request *RecordW InternalVisibilityRequestBase: v.newInternalVisibilityRequestBase(request.VisibilityRequestBase), CloseTimestamp: request.CloseTimestamp, HistoryLength: request.HistoryLength, - RetentionSeconds: request.RetentionSeconds, } return v.persistence.RecordWorkflowExecutionClosedV2(req) } @@ -106,7 +103,6 @@ func (v *visibilityManagerImpl) RecordWorkflowExecutionClosedV2(request *RecordW func (v *visibilityManagerImpl) UpsertWorkflowExecution(request *UpsertWorkflowExecutionRequest) error { req := &InternalUpsertWorkflowExecutionRequest{ InternalVisibilityRequestBase: v.newInternalVisibilityRequestBase(request.VisibilityRequestBase), - WorkflowTimeout: request.WorkflowTimeout, } return v.persistence.UpsertWorkflowExecution(req) } @@ -114,7 +110,6 @@ func (v *visibilityManagerImpl) UpsertWorkflowExecution(request *UpsertWorkflowE func (v *visibilityManagerImpl) UpsertWorkflowExecutionV2(request *UpsertWorkflowExecutionRequest) error { req := &InternalUpsertWorkflowExecutionRequest{ InternalVisibilityRequestBase: v.newInternalVisibilityRequestBase(request.VisibilityRequestBase), - WorkflowTimeout: request.WorkflowTimeout, } return v.persistence.UpsertWorkflowExecutionV2(req) } diff --git a/service/history/transferQueueActiveTaskExecutorV2_test.go b/service/history/transferQueueActiveTaskExecutorV2_test.go index e39e210692e..5b6d570d111 100644 --- a/service/history/transferQueueActiveTaskExecutorV2_test.go +++ b/service/history/transferQueueActiveTaskExecutorV2_test.go @@ -1990,7 +1990,6 @@ func (s *transferQueueActiveTaskExecutorSuiteV2) createRecordWorkflowExecutionSt TaskID: task.GetTaskId(), TaskQueue: task.TaskQueue, }, - RunTimeout: int64(timestamp.DurationValue(executionInfo.WorkflowRunTimeout).Round(time.Second).Seconds()), } } @@ -2123,7 +2122,6 @@ func (s *transferQueueActiveTaskExecutorSuiteV2) createUpsertWorkflowSearchAttri Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, TaskQueue: task.TaskQueue, }, - WorkflowTimeout: int64(timestamp.DurationValue(executionInfo.WorkflowRunTimeout).Round(time.Second).Seconds()), } } diff --git a/service/history/transferQueueStandbyTaskExecutor_test.go b/service/history/transferQueueStandbyTaskExecutor_test.go index 6de7c509d0c..d714a53eb80 100644 --- a/service/history/transferQueueStandbyTaskExecutor_test.go +++ b/service/history/transferQueueStandbyTaskExecutor_test.go @@ -1160,7 +1160,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessRecordWorkflowStarted TaskID: taskID, TaskQueue: taskQueueName, }, - RunTimeout: int64(timestamp.DurationValue(executionInfo.WorkflowRunTimeout).Round(time.Second).Seconds()), }).Return(nil).Once() s.mockShard.SetCurrentTime(s.clusterName, *now) @@ -1228,7 +1227,6 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchA TaskQueue: taskQueueName, Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, }, - WorkflowTimeout: int64(timestamp.DurationValue(executionInfo.WorkflowRunTimeout).Round(time.Second).Seconds()), }).Return(nil).Once() s.mockShard.SetCurrentTime(s.clusterName, *now) diff --git a/service/history/transferQueueTaskExecutorBase.go b/service/history/transferQueueTaskExecutorBase.go index c8c574737b9..8a6305b5f1b 100644 --- a/service/history/transferQueueTaskExecutorBase.go +++ b/service/history/transferQueueTaskExecutorBase.go @@ -195,7 +195,6 @@ func (t *transferQueueTaskExecutorBase) recordWorkflowStarted( TaskQueue: taskQueue, SearchAttributes: searchAttributes, }, - RunTimeout: int64(timestamp.DurationValue(runTimeout).Round(time.Second).Seconds()), } return t.visibilityMgr.RecordWorkflowExecutionStarted(request) } @@ -242,7 +241,6 @@ func (t *transferQueueTaskExecutorBase) upsertWorkflowExecution( TaskQueue: taskQueue, SearchAttributes: searchAttributes, }, - WorkflowTimeout: int64(timestamp.DurationValue(workflowTimeout).Round(time.Second).Seconds()), } return t.visibilityMgr.UpsertWorkflowExecution(request) @@ -265,7 +263,7 @@ func (t *transferQueueTaskExecutorBase) recordWorkflowClosed( ) error { // Record closing in visibility store - retentionSeconds := int64(0) + var retention *time.Duration namespace := defaultNamespace recordWorkflowClose := true archiveVisibility := false @@ -277,7 +275,7 @@ func (t *transferQueueTaskExecutorBase) recordWorkflowClosed( if err == nil { // retention in namespace config is in days, convert to seconds - retentionSeconds = int64(timestamp.DurationFromDays(namespaceEntry.GetRetentionDays(workflowID)).Seconds()) + retention = timestamp.DurationFromDays(namespaceEntry.GetRetentionDays(workflowID)) namespace = namespaceEntry.GetInfo().Name // if sampled for longer retention is enabled, only record those sampled events if namespaceEntry.IsSampledForLongerRetentionEnabled(workflowID) && @@ -308,9 +306,9 @@ func (t *transferQueueTaskExecutorBase) recordWorkflowClosed( TaskQueue: taskQueue, SearchAttributes: searchAttributes, }, - CloseTimestamp: endTime.UnixNano(), - HistoryLength: historyLength, - RetentionSeconds: retentionSeconds, + CloseTimestamp: endTime.UnixNano(), + HistoryLength: historyLength, + Retention: retention, }); err != nil { return err } @@ -345,6 +343,7 @@ func (t *transferQueueTaskExecutorBase) recordWorkflowClosed( return nil } +// TODO: remove func isWorkflowNotExistError(err error) bool { _, ok := err.(*serviceerror.NotFound) return ok diff --git a/service/history/visibilityQueueTaskExecutor.go b/service/history/visibilityQueueTaskExecutor.go index a7b41479d71..fe51bc29529 100644 --- a/service/history/visibilityQueueTaskExecutor.go +++ b/service/history/visibilityQueueTaskExecutor.go @@ -162,7 +162,6 @@ func (t *visibilityQueueTaskExecutor) processStartOrUpsertExecution( executionInfo := mutableState.GetExecutionInfo() executionState := mutableState.GetExecutionState() - runTimeout := executionInfo.WorkflowRunTimeout wfTypeName := executionInfo.WorkflowTypeName startEvent, err := mutableState.GetStartEvent() @@ -189,7 +188,6 @@ func (t *visibilityQueueTaskExecutor) processStartOrUpsertExecution( wfTypeName, startTimestamp.UnixNano(), executionTimestamp.UnixNano(), - runTimeout, task.GetTaskId(), executionStatus, taskQueue, @@ -204,7 +202,6 @@ func (t *visibilityQueueTaskExecutor) processStartOrUpsertExecution( wfTypeName, startTimestamp.UnixNano(), executionTimestamp.UnixNano(), - runTimeout, task.GetTaskId(), executionStatus, taskQueue, @@ -219,7 +216,6 @@ func (t *visibilityQueueTaskExecutor) recordStartExecution( workflowTypeName string, startTimeUnixNano int64, executionTimeUnixNano int64, - runTimeout *time.Duration, taskID int64, status enumspb.WorkflowExecutionStatus, taskQueue string, @@ -260,7 +256,6 @@ func (t *visibilityQueueTaskExecutor) recordStartExecution( TaskQueue: taskQueue, SearchAttributes: searchAttributes, }, - RunTimeout: int64(timestamp.DurationValue(runTimeout).Round(time.Second).Seconds()), } return t.visibilityMgr.RecordWorkflowExecutionStartedV2(request) } @@ -272,7 +267,6 @@ func (t *visibilityQueueTaskExecutor) upsertExecution( workflowTypeName string, startTimeUnixNano int64, executionTimeUnixNano int64, - workflowTimeout *time.Duration, taskID int64, status enumspb.WorkflowExecutionStatus, taskQueue string, @@ -308,7 +302,6 @@ func (t *visibilityQueueTaskExecutor) upsertExecution( TaskQueue: taskQueue, SearchAttributes: searchAttributes, }, - WorkflowTimeout: int64(timestamp.DurationValue(workflowTimeout).Round(time.Second).Seconds()), } return t.visibilityMgr.UpsertWorkflowExecutionV2(request) @@ -407,19 +400,20 @@ func (t *visibilityQueueTaskExecutor) recordCloseExecution( searchAttributes *commonpb.SearchAttributes, ) error { - // Record closing in visibility store - retentionSeconds := int64(0) - namespace := defaultNamespace - recordWorkflowClose := true - namespaceEntry, err := t.shard.GetNamespaceCache().GetNamespaceByID(namespaceID) - if err != nil && !isWorkflowNotExistError(err) { - return err + if err != nil { + if _, notFound := err.(*serviceerror.NotFound); !notFound { + return err + } } - if err == nil { - // retention in namespace config is in days, convert to seconds - retentionSeconds = int64(timestamp.DurationFromDays(namespaceEntry.GetRetentionDays(workflowID)).Seconds()) + var retention *time.Duration + namespace := defaultNamespace + recordWorkflowClose := true + + if namespaceEntry != nil { + // retention in namespace config is in days, convert to time.Duration. + retention = timestamp.DurationFromDays(namespaceEntry.GetRetentionDays(workflowID)) namespace = namespaceEntry.GetInfo().Name // if sampled for longer retention is enabled, only record those sampled events if namespaceEntry.IsSampledForLongerRetentionEnabled(workflowID) && @@ -447,9 +441,9 @@ func (t *visibilityQueueTaskExecutor) recordCloseExecution( TaskQueue: taskQueue, SearchAttributes: searchAttributes, }, - CloseTimestamp: endTime.UnixNano(), - HistoryLength: historyLength, - RetentionSeconds: retentionSeconds, + CloseTimestamp: endTime.UnixNano(), + HistoryLength: historyLength, + Retention: retention, }) } diff --git a/service/history/visibilityQueueTaskExecutor_test.go b/service/history/visibilityQueueTaskExecutor_test.go index 3644b68a700..43e981e8acb 100644 --- a/service/history/visibilityQueueTaskExecutor_test.go +++ b/service/history/visibilityQueueTaskExecutor_test.go @@ -365,7 +365,6 @@ func (s *visibilityQueueTaskExecutorSuite) createRecordWorkflowExecutionStartedR ShardID: s.mockShard.GetShardID(), TaskQueue: taskQueueName, }, - RunTimeout: int64(timestamp.DurationValue(executionInfo.WorkflowRunTimeout).Round(time.Second).Seconds()), } } @@ -395,7 +394,6 @@ func (s *visibilityQueueTaskExecutorSuite) createUpsertWorkflowSearchAttributesR TaskQueue: taskQueueName, ShardID: s.mockShard.GetShardID(), }, - WorkflowTimeout: int64(timestamp.DurationValue(executionInfo.WorkflowRunTimeout).Round(time.Second).Seconds()), } }