diff --git a/pkg/api/v1/testkube/model_test_workflow_extended.go b/pkg/api/v1/testkube/model_test_workflow_extended.go index 6b79acb1245..c2369ab7425 100644 --- a/pkg/api/v1/testkube/model_test_workflow_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_extended.go @@ -33,6 +33,9 @@ func (w *TestWorkflow) ConvertDots(fn func(string) string) *TestWorkflow { if w.Labels == nil { w.Labels = convertDotsInMap(w.Labels, fn) } + if w.Spec == nil { + return w + } if w.Spec.Pod != nil { w.Spec.Pod.Labels = convertDotsInMap(w.Spec.Pod.Labels, fn) w.Spec.Pod.Annotations = convertDotsInMap(w.Spec.Pod.Annotations, fn) diff --git a/pkg/cloud/data/testworkflow/execution.go b/pkg/cloud/data/testworkflow/execution.go index a0dd8b69d0b..bb913ba26cc 100644 --- a/pkg/cloud/data/testworkflow/execution.go +++ b/pkg/cloud/data/testworkflow/execution.go @@ -88,8 +88,8 @@ func (r *CloudRepository) GetExecutionsSummary(ctx context.Context, filter testw return pass(r.executor, ctx, req, process) } -func (r *CloudRepository) Insert(ctx context.Context, result testkube.TestWorkflowExecution) (err error) { - req := ExecutionInsertRequest{WorkflowExecution: result} +func (r *CloudRepository) Insert(ctx context.Context, result *testkube.TestWorkflowExecution) (err error) { + req := ExecutionInsertRequest{WorkflowExecution: *result} return passNoContent(r.executor, ctx, req) } diff --git a/pkg/repository/testworkflow/interface.go b/pkg/repository/testworkflow/interface.go index cea8a37053e..55831e12358 100644 --- a/pkg/repository/testworkflow/interface.go +++ b/pkg/repository/testworkflow/interface.go @@ -50,7 +50,7 @@ type Repository interface { // GetPreviousFinishedState gets previous finished execution state by test GetPreviousFinishedState(ctx context.Context, testName string, date time.Time) (testkube.TestWorkflowStatus, error) // Insert inserts new execution result - Insert(ctx context.Context, result testkube.TestWorkflowExecution) error + Insert(ctx context.Context, result *testkube.TestWorkflowExecution) error // Update updates execution Update(ctx context.Context, result testkube.TestWorkflowExecution) error // UpdateResult updates execution result diff --git a/pkg/repository/testworkflow/mock_repository.go b/pkg/repository/testworkflow/mock_repository.go index ddf65acc478..daef81dc24e 100644 --- a/pkg/repository/testworkflow/mock_repository.go +++ b/pkg/repository/testworkflow/mock_repository.go @@ -249,7 +249,7 @@ func (mr *MockRepositoryMockRecorder) GetTestWorkflowMetrics(arg0, arg1, arg2, a } // Insert mocks base method. -func (m *MockRepository) Insert(arg0 context.Context, arg1 testkube.TestWorkflowExecution) error { +func (m *MockRepository) Insert(arg0 context.Context, arg1 *testkube.TestWorkflowExecution) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Insert", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/pkg/repository/testworkflow/mongo.go b/pkg/repository/testworkflow/mongo.go index 4d6183ef160..2170fb12169 100644 --- a/pkg/repository/testworkflow/mongo.go +++ b/pkg/repository/testworkflow/mongo.go @@ -13,6 +13,7 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/writeconcern" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/repository/sequence" @@ -268,12 +269,42 @@ func (r *MongoRepository) GetExecutionsSummary(ctx context.Context, filter Filte return } -func (r *MongoRepository) Insert(ctx context.Context, result testkube.TestWorkflowExecution) (err error) { +func (r *MongoRepository) Insert(ctx context.Context, result *testkube.TestWorkflowExecution) (err error) { result.EscapeDots() if result.Reports == nil { result.Reports = []testkube.TestWorkflowReport{} } - _, err = r.Coll.InsertOne(ctx, result) + + wc := writeconcern.New(writeconcern.WMajority()) + txnOptions := options.Transaction().SetWriteConcern(wc) + session, err := r.db.Client().StartSession() + if err != nil { + return err + } + defer session.EndSession(ctx) + + _, err = session.WithTransaction(ctx, func(sessCtx mongo.SessionContext) (interface{}, error) { + number, err := r.sequenceRepository.GetNextExecutionNumber(sessCtx, result.Workflow.Name, sequence.ExecutionTypeTestWorkflow) + if err != nil { + return nil, err + } + + result.Number = number + if result.Name == "" { + result.Name = fmt.Sprintf("%s-%d", result.Workflow.Name, result.Number) + } + + // Ensure it is unique name + // TODO: Consider if we shouldn't make name unique across all TestWorkflows + next, _ := r.GetByNameAndTestWorkflow(sessCtx, result.Name, result.Workflow.Name) + if next.Name == result.Name { + return nil, errors.New("execution name already exists") + } + + _, err = r.Coll.InsertOne(sessCtx, result) + return nil, err + }, txnOptions) + return } diff --git a/pkg/repository/testworkflow/mongo_integration_test.go b/pkg/repository/testworkflow/mongo_integration_test.go index f233041c1ac..d85329e30ee 100644 --- a/pkg/repository/testworkflow/mongo_integration_test.go +++ b/pkg/repository/testworkflow/mongo_integration_test.go @@ -12,6 +12,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" "github.com/kubeshop/testkube/pkg/api/v1/testkube" + "github.com/kubeshop/testkube/pkg/repository/sequence" ) var ( @@ -32,13 +33,16 @@ func TestNewMongoRepository_UpdateReport_Integration(t *testing.T) { db.Drop(ctx) }) - repo := NewMongoRepository(db, false) + seq := sequence.NewMongoRepository(db) + repo := NewMongoRepository(db, false, WithMongoRepositorySequence(seq)) execution := testkube.TestWorkflowExecution{ - Id: "test-id", - Name: "test-name", + Id: "test-id", + Workflow: &testkube.TestWorkflow{ + Name: "test-name", + }, } - if err := repo.Insert(ctx, execution); err != nil { + if err := repo.Insert(ctx, &execution); err != nil { t.Fatalf("error inserting execution: %v", err) } diff --git a/pkg/testworkflows/testworkflowexecutor/executor.go b/pkg/testworkflows/testworkflowexecutor/executor.go index 637e7a69680..85706b2dbe9 100644 --- a/pkg/testworkflows/testworkflowexecutor/executor.go +++ b/pkg/testworkflows/testworkflowexecutor/executor.go @@ -464,57 +464,21 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor return execution, errors.Wrap(err, "processing error") } - // Load execution identifier data - number, err := e.repository.GetNextExecutionNumber(context.Background(), workflow.Name) - if err != nil { - log.DefaultLogger.Errorw("failed to retrieve TestWorkflow execution number", "id", id, "error", err) - } - - executionName := request.Name - if executionName == "" { - executionName = fmt.Sprintf("%s-%d", workflow.Name, number) - } - testWorkflowExecutionName := request.TestWorkflowExecutionName - // Ensure it is unique name - // TODO: Consider if we shouldn't make name unique across all TestWorkflows - next, _ := e.repository.GetByNameAndTestWorkflow(ctx, executionName, workflow.Name) - if next.Name == executionName { - return execution, errors.Wrap(err, "execution name already exists") - } - - // Build machine with actual execution data - executionMachine := expressions.NewMachine().Register("execution", map[string]interface{}{ - "id": id, - "name": executionName, - "number": number, - "scheduledAt": now.UTC().Format(constants.RFC3339Millis), - "disableWebhooks": request.DisableWebhooks, - }) - - // Process the TestWorkflow - bundle, err := e.processor.Bundle(ctx, &workflow, machine, executionMachine) - if err != nil { - return execution, errors.Wrap(err, "processing error") - } - // Build Execution entity // TODO: Consider storing "config" as well execution = testkube.TestWorkflowExecution{ Id: id, - Name: executionName, + Name: request.Name, Namespace: namespace, - Number: number, ScheduledAt: now, StatusAt: now, - Signature: stage.MapSignatureListToInternal(bundle.Signature), Result: &testkube.TestWorkflowResult{ Status: common.Ptr(testkube.QUEUED_TestWorkflowStatus), PredictedStatus: common.Ptr(testkube.PASSED_TestWorkflowStatus), Initialization: &testkube.TestWorkflowStepResult{ Status: common.Ptr(testkube.QUEUED_TestWorkflowStepStatus), }, - Steps: stage.MapSignatureListToStepResults(bundle.Signature), }, Output: []testkube.TestWorkflowOutput{}, Workflow: testworkflowmappers.MapKubeToAPI(initialWorkflow), @@ -522,11 +486,34 @@ func (e *executor) Execute(ctx context.Context, workflow testworkflowsv1.TestWor TestWorkflowExecutionName: testWorkflowExecutionName, DisableWebhooks: request.DisableWebhooks, } - err = e.repository.Insert(ctx, execution) + err = e.repository.Insert(ctx, &execution) if err != nil { return execution, errors.Wrap(err, "inserting execution to storage") } + // Build machine with actual execution data + executionMachine := expressions.NewMachine().Register("execution", map[string]interface{}{ + "id": id, + "name": execution.Name, + "number": execution.Number, + "scheduledAt": now.UTC().Format(constants.RFC3339Millis), + "disableWebhooks": request.DisableWebhooks, + }) + + // Process the TestWorkflow + bundle, err := e.processor.Bundle(ctx, &workflow, machine, executionMachine) + if err != nil { + return execution, errors.Wrap(err, "processing error") + } + + execution.Signature = stage.MapSignatureListToInternal(bundle.Signature) + execution.Result.Steps = stage.MapSignatureListToStepResults(bundle.Signature) + + err = e.repository.Update(ctx, execution) + if err != nil { + return execution, errors.Wrap(err, "updating execution to storage") + } + // Inform about execution start e.emitter.Notify(testkube.NewEventQueueTestWorkflow(&execution))