diff --git a/pkg/apiclient/argo-kube-client.go b/pkg/apiclient/argo-kube-client.go index 0f3ea042619b..b56deb251852 100644 --- a/pkg/apiclient/argo-kube-client.go +++ b/pkg/apiclient/argo-kube-client.go @@ -89,7 +89,7 @@ func newArgoKubeClient(ctx context.Context, clientConfig clientcmd.ClientConfig, func (a *argoKubeClient) NewWorkflowServiceClient() workflowpkg.WorkflowServiceClient { wfArchive := sqldb.NullWorkflowArchive - wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive) + wfaServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, argoKubeOffloadNodeStatusRepo) return &errorTranslatingWorkflowServiceClient{&argoKubeWorkflowServiceClient{workflowserver.NewWorkflowServer(a.instanceIDService, argoKubeOffloadNodeStatusRepo, wfaServer)}} } diff --git a/server/apiserver/argoserver.go b/server/apiserver/argoserver.go index 9370bffed562..815698ea6eb9 100644 --- a/server/apiserver/argoserver.go +++ b/server/apiserver/argoserver.go @@ -308,7 +308,7 @@ func (as *argoServer) newGRPCServer(instanceIDService instanceid.Service, offloa } grpcServer := grpc.NewServer(sOpts...) - wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive) + wfArchiveServer := workflowarchive.NewWorkflowArchiveServer(wfArchive, offloadNodeStatusRepo) infopkg.RegisterInfoServiceServer(grpcServer, info.NewInfoServer(as.managedNamespace, links, columns, navColor)) eventpkg.RegisterEventServiceServer(grpcServer, eventServer) eventsourcepkg.RegisterEventSourceServiceServer(grpcServer, eventsource.NewEventSourceServer()) diff --git a/server/workflow/workflow_server_test.go b/server/workflow/workflow_server_test.go index 73a68bb29fe1..e91e672ba1cb 100644 --- a/server/workflow/workflow_server_test.go +++ b/server/workflow/workflow_server_test.go @@ -590,7 +590,7 @@ func getWorkflowServer() (workflowpkg.WorkflowServiceServer, context.Context) { archivedRepo := &mocks.WorkflowArchive{} - wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo) + wfaServer := workflowarchive.NewWorkflowArchiveServer(archivedRepo, offloadNodeStatusRepo) archivedRepo.On("GetWorkflow", "", "test", "hello-world-9tql2-test").Return(&v1alpha1.Workflow{ ObjectMeta: metav1.ObjectMeta{Name: "hello-world-9tql2-test", Namespace: "test"}, Spec: v1alpha1.WorkflowSpec{ diff --git a/server/workflowarchive/archived_workflow_server.go b/server/workflowarchive/archived_workflow_server.go index 511fe8d3ecc6..e1a70e3bed7b 100644 --- a/server/workflowarchive/archived_workflow_server.go +++ b/server/workflowarchive/archived_workflow_server.go @@ -23,6 +23,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/server/auth" + "github.com/argoproj/argo-workflows/v3/workflow/hydrator" "github.com/argoproj/argo-workflows/v3/workflow/util" sutils "github.com/argoproj/argo-workflows/v3/server/utils" @@ -31,12 +32,14 @@ import ( const disableValueListRetrievalKeyPattern = "DISABLE_VALUE_LIST_RETRIEVAL_KEY_PATTERN" type archivedWorkflowServer struct { - wfArchive sqldb.WorkflowArchive + wfArchive sqldb.WorkflowArchive + offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo + hydrator hydrator.Interface } // NewWorkflowArchiveServer returns a new archivedWorkflowServer -func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) workflowarchivepkg.ArchivedWorkflowServiceServer { - return &archivedWorkflowServer{wfArchive: wfArchive} +func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive, offloadNodeStatusRepo sqldb.OffloadNodeStatusRepo) workflowarchivepkg.ArchivedWorkflowServiceServer { + return &archivedWorkflowServer{wfArchive, offloadNodeStatusRepo, hydrator.New(offloadNodeStatusRepo)} } func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *workflowarchivepkg.ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) { @@ -282,6 +285,7 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } + oriUid := wf.UID _, err = wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Get(ctx, wf.Name, metav1.GetOptions{}) if apierr.IsNotFound(err) { @@ -299,12 +303,30 @@ func (w *archivedWorkflowServer) RetryArchivedWorkflow(ctx context.Context, req } } + log.WithFields(log.Fields{"Dehydrate workflow uid=": wf.UID}).Info("RetryArchivedWorkflow") + // If the Workflow needs to be dehydrated in order to capture and retain all of the previous state for the subsequent workflow, then do so + err = w.hydrator.Dehydrate(wf) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) + } + wf.ObjectMeta.ResourceVersion = "" wf.ObjectMeta.UID = "" result, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).Create(ctx, wf, metav1.CreateOptions{}) if err != nil { return nil, sutils.ToStatusError(err, codes.Internal) } + // if the Workflow was dehydrated before, we need to capture and maintain its previous state for the new Workflow + if !w.hydrator.IsHydrated(wf) { + offloadedNodes, err := w.offloadNodeStatusRepo.Get(string(oriUid), wf.GetOffloadNodeStatusVersion()) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) + } + _, err = w.offloadNodeStatusRepo.Save(string(result.UID), wf.Namespace, offloadedNodes) + if err != nil { + return nil, sutils.ToStatusError(err, codes.Internal) + } + } return result, nil } diff --git a/server/workflowarchive/archived_workflow_server_test.go b/server/workflowarchive/archived_workflow_server_test.go index 7d3f9c89fac2..50d68d27db7b 100644 --- a/server/workflowarchive/archived_workflow_server_test.go +++ b/server/workflowarchive/archived_workflow_server_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" authorizationv1 "k8s.io/api/authorization/v1" @@ -17,8 +18,10 @@ import ( kubefake "k8s.io/client-go/kubernetes/fake" k8stesting "k8s.io/client-go/testing" + "github.com/argoproj/argo-workflows/v3/persist/sqldb" "github.com/argoproj/argo-workflows/v3/persist/sqldb/mocks" workflowarchivepkg "github.com/argoproj/argo-workflows/v3/pkg/apiclient/workflowarchive" + "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" argofake "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/fake" "github.com/argoproj/argo-workflows/v3/server/auth" @@ -29,7 +32,10 @@ func Test_archivedWorkflowServer(t *testing.T) { repo := &mocks.WorkflowArchive{} kubeClient := &kubefake.Clientset{} wfClient := &argofake.Clientset{} - w := NewWorkflowArchiveServer(repo) + offloadNodeStatusRepo := &mocks.OffloadNodeStatusRepo{} + offloadNodeStatusRepo.On("IsEnabled", mock.Anything).Return(true) + offloadNodeStatusRepo.On("List", mock.Anything).Return(map[sqldb.UUIDVersion]v1alpha1.Nodes{}, nil) + w := NewWorkflowArchiveServer(repo, offloadNodeStatusRepo) allowed := true kubeClient.AddReactor("create", "selfsubjectaccessreviews", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { return true, &authorizationv1.SelfSubjectAccessReview{ diff --git a/util/errors/errors.go b/util/errors/errors.go index 496176070128..2982be216fd7 100644 --- a/util/errors/errors.go +++ b/util/errors/errors.go @@ -37,7 +37,8 @@ func IsTransientErr(err error) bool { apierr.IsServiceUnavailable(err) || isTransientEtcdErr(err) || matchTransientErrPattern(err) || - errors.Is(err, NewErrTransient("")) + errors.Is(err, NewErrTransient("")) || + isTransientSqbErr(err) if isTransient { log.Infof("Transient error: %v", err) } else { @@ -123,3 +124,7 @@ func generateErrorString(err error) string { } return errorString } + +func isTransientSqbErr(err error) bool { + return strings.Contains(err.Error(), "upper: no more rows in") +}