Skip to content

Commit

Permalink
fix workflow auto approval job finding (#13457)
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann authored Jun 7, 2024
1 parent a4466c6 commit 3456651
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 13 deletions.
7 changes: 1 addition & 6 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
}
}
case job.Workflow:
existingJobID, txerr = findExistingWorkflowJob(ctx, *j.WorkflowSpec, tx.jobORM)
existingJobID, txerr = tx.jobORM.FindJobIDByWorkflow(ctx, *j.WorkflowSpec)
if txerr != nil {
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
Expand Down Expand Up @@ -1108,11 +1108,6 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
return nil
}

// TODO KS-205 implement this. Need to figure out how exactly how we want to handle this.
func findExistingWorkflowJob(ctx context.Context, wfSpec job.WorkflowSpec, tx job.ORM) (int32, error) {
return 0, nil
}

// findExistingJobForOCR2 looks for existing job for OCR2
func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32, error) {
var contractID string
Expand Down
82 changes: 75 additions & 7 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,7 @@ targets:
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, wfName, specYaml).Toml()
proposalIDWF = int64(11)
jobProposalSpecIdWF = int64(101)
jobIDWF = int32(1001)
remoteUUIDWF = uuid.New()
argsWF = &feeds.ProposeJobArgs{
FeedsManagerID: 1,
Expand All @@ -698,12 +699,26 @@ targets:
RemoteUUID: remoteUUIDWF,
Status: feeds.JobProposalStatusPending,
}
acceptedjpWF = feeds.JobProposal{
ID: 13,
FeedsManagerID: 1,
Name: null.StringFrom("test-spec"),
RemoteUUID: remoteUUIDWF,
Status: feeds.JobProposalStatusPending,
}
proposalSpecWF = feeds.JobProposalSpec{
Definition: wfSpec,
Status: feeds.SpecStatusPending,
Version: 1,
JobProposalID: proposalIDWF,
}
autoApprovableProposalSpecWF = feeds.JobProposalSpec{
ID: jobProposalSpecIdWF,
Definition: wfSpec,
Status: feeds.SpecStatusPending,
Version: 1,
JobProposalID: proposalIDWF,
}
)

testCases := []struct {
Expand All @@ -714,7 +729,7 @@ targets:
wantErr string
}{
{
name: "Auto approve WF spec",
name: "Auto approve new WF spec",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
Expand All @@ -727,13 +742,66 @@ targets:
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, jobProposalSpecIdWF).Return(&proposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, jobProposalSpecIdWF).Return(&autoApprovableProposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, autoApprovableProposalSpecWF.JobProposalID).Return(&acceptedjpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows)
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.jobORM.On("FindJobIDByWorkflow", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) // no existing job
svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.WorkflowSpec.WorkflowOwner == wfOwner
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
jobProposalSpecIdWF,
mock.IsType(uuid.UUID{}),
).Return(nil)
svc.fmsClient.On("ApprovedJob",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
&proto.ApprovedJobRequest{
Uuid: jpWF.RemoteUUID.String(),
Version: int64(proposalSpecWF.Version),
},
).Return(&proto.ApprovedJobResponse{}, nil)
},
args: argsWF,
wantID: proposalIDWF,
},

{
name: "Auto approve existing WF spec found by FindJobIDByWorkflow",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(jobProposalSpecIdWF, nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)}
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, jobProposalSpecIdWF).Return(&autoApprovableProposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, autoApprovableProposalSpecWF.JobProposalID).Return(&acceptedjpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows)
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.jobORM.On("FindJobIDByWorkflow", mock.Anything, mock.Anything).Return(jobIDWF, sql.ErrNoRows)
svc.orm.On("GetApprovedSpec", mock.Anything, acceptedjpWF.ID).Return(&autoApprovableProposalSpecWF, nil)
svc.orm.On("CancelSpec", mock.Anything, autoApprovableProposalSpecWF.ID).Return(nil)
svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, jobIDWF).Return(nil)
svc.spawner.
On("CreateJob",
mock.Anything,
Expand Down Expand Up @@ -762,12 +830,11 @@ targets:
},

{
name: "Auto approve WF spec: error creating job",
name: "Auto approve WF spec: error creating job for new spec",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(jobProposalSpecIdWF, nil)
// svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
Expand All @@ -779,9 +846,10 @@ targets:
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows)
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.jobORM.On("FindJobIDByWorkflow", mock.Anything, mock.Anything).Return(int32(0), sql.ErrNoRows) // no existing job
svc.spawner.
On("CreateJob",
mock.Anything,
Expand Down

0 comments on commit 3456651

Please sign in to comment.