Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Revert #11761 to avoid argo-server performance issue #12068

Merged
merged 3 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 18 additions & 72 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"sort"
"strconv"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -129,50 +128,7 @@ func (s *workflowServer) GetWorkflow(ctx context.Context, req *workflowpkg.Workf
return wf, nil
}

func cursorPaginationByResourceVersion(items []wfv1.Workflow, resourceVersion string, limit int64, wfList *wfv1.WorkflowList) {
// Sort the workflow list in descending order by resourceVersion.
sort.Slice(items, func(i, j int) bool {
itemIRV, _ := strconv.Atoi(items[i].ResourceVersion)
itemJRV, _ := strconv.Atoi(items[j].ResourceVersion)
return itemIRV > itemJRV
})

// resourceVersion: unique value to identify the version of the object by Kubernetes. It is used for pagination in workflows.
// receivedRV: resourceVersion value used for previous pagination
// Due to the descending sorting above, the items are filtered to have a resourceVersion smaller than receivedRV.
// The data with values smaller than the receivedRV on the current page will be used for the next page.
if resourceVersion != "" {
var newItems []wfv1.Workflow
for _, item := range items {
targetRV, _ := strconv.Atoi(item.ResourceVersion)
receivedRV, _ := strconv.Atoi(resourceVersion)
if targetRV < receivedRV {
newItems = append(newItems, item)
}
items = newItems
}
}

// Indexing list by limit count
if limit != 0 {
endIndex := int(limit)
if endIndex > len(items) || limit == 0 {
endIndex = len(items)
}
wfList.Items = items[0:endIndex]
} else {
wfList.Items = items
}

// Calculate new offset for next page
// For the next pagination, the resourceVersion of the last item is set in the Continue field.
if limit != 0 && len(wfList.Items) == int(limit) {
lastIndex := len(wfList.Items) - 1
wfList.ListMeta.Continue = wfList.Items[lastIndex].ResourceVersion
}
}

func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.WorkflowList) *wfv1.WorkflowList {
func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.WorkflowList, numWfsToKeep int) *wfv1.WorkflowList {
var mergedWfs []wfv1.Workflow
var uidToWfs = map[types.UID][]wfv1.Workflow{}
for _, item := range liveWfs.Items {
Expand All @@ -195,53 +151,43 @@ func mergeWithArchivedWorkflows(liveWfs wfv1.WorkflowList, archivedWfs wfv1.Work
}
}
}
// The ListMeta of type WorkflowList requires a resourceVersion for the List object.
// While archivedWfs does not have a resourceVersion corresponding to the List object,
// liveWfs does have a resourceVersion corresponding to the List object.
// Therefore, the ListMetadata should contain the ListMetadata value of liveWfs.
return &wfv1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
mergedWfsList := wfv1.WorkflowList{Items: mergedWfs, ListMeta: liveWfs.ListMeta}
sort.Sort(mergedWfsList.Items)
numWfs := 0
var finalWfs []wfv1.Workflow
for _, item := range mergedWfsList.Items {
if numWfsToKeep == 0 || numWfs < numWfsToKeep {
finalWfs = append(finalWfs, item)
numWfs += 1
}
}
return &wfv1.WorkflowList{Items: finalWfs, ListMeta: liveWfs.ListMeta}
}

func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.WorkflowListRequest) (*wfv1.WorkflowList, error) {
wfClient := auth.GetWfClient(ctx)

options := &metav1.ListOptions{}
listOption := &metav1.ListOptions{}
if req.ListOptions != nil {
options = req.ListOptions
listOption = req.ListOptions
}

// Save the original Continue and Limit.
resourceVersion := options.Continue
limit := options.Limit

// Search whole with Limit 0.
// Reset the Continue "" to prevent Kubernetes native pagination.
options.Continue = ""
options.Limit = 0

s.instanceIDService.With(options)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *options)
s.instanceIDService.With(listOption)
wfList, err := wfClient.ArgoprojV1alpha1().Workflows(req.Namespace).List(ctx, *listOption)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

// Search whole with Limit 0.
// Reset the Continue "0" to prevent archive workflow pagination.
options.Continue = "0"
options.Limit = 0
archivedWfList, err := s.wfArchiveServer.ListArchivedWorkflows(ctx, &workflowarchivepkg.ListArchivedWorkflowsRequest{
ListOptions: options,
ListOptions: listOption,
NamePrefix: "",
Namespace: req.Namespace,
})
if err != nil {
log.Warnf("unable to list archived workflows:%v", err)
} else {
if archivedWfList != nil {
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList)
wfList = mergeWithArchivedWorkflows(*wfList, *archivedWfList, int(listOption.Limit))
}
}
cursorPaginationByResourceVersion(wfList.Items, resourceVersion, limit, wfList)

cleaner := fields.NewCleaner(req.Fields)
if s.offloadNodeStatusRepo.IsEnabled() && !cleaner.WillExclude("items.status.nodes") {
Expand Down
43 changes: 4 additions & 39 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"sort"

"testing"
"time"

Expand Down Expand Up @@ -665,44 +663,11 @@ func TestMergeWithArchivedWorkflows(t *testing.T) {
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{UID: "3", CreationTimestamp: metav1.Time{Time: timeNow.Add(3 * time.Second)}}}
liveWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Live, wf2}}
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Archived}}
archivedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf1Archived, wf3, wf2}}
expectedWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2, wf1Live}}
mergedWfItems := mergeWithArchivedWorkflows(liveWfList, archivedWfList).Items
sort.Sort(mergedWfItems)
assert.Equal(t, expectedWfList.Items, mergedWfItems)
}

func TestCursorPaginationByResourceVersion(t *testing.T) {
wf1 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "1", Name: "wf1"}}
wf2 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "2", Name: "wf2"}}
wf3 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "3", Name: "wf3"}}
wf4 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "4", Name: "wf4"}}
wf5 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "5", Name: "wf5"}}
wf6 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "6", Name: "wf6"}}
wf7 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "7", Name: "wf7"}}
wf8 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "8", Name: "wf8"}}
wf9 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "9", Name: "wf9"}}
wf10 := v1alpha1.Workflow{
ObjectMeta: metav1.ObjectMeta{ResourceVersion: "10", Name: "wf10"}}

items := []v1alpha1.Workflow{wf2, wf1, wf4, wf3, wf6, wf5, wf8, wf7, wf10, wf9}
wfList := &v1alpha1.WorkflowList{}

cursorPaginationByResourceVersion(items, "8", 5, wfList)
expectedWfList := &v1alpha1.WorkflowList{}
expectedWfList.Items = []v1alpha1.Workflow{wf7, wf6, wf5, wf4, wf3}
expectedWfList.ListMeta.Continue = "3"

assert.Equal(t, expectedWfList, wfList)
expectedShortWfList := v1alpha1.WorkflowList{Items: []v1alpha1.Workflow{wf3, wf2}}
assert.Equal(t, expectedWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 0).Items)
assert.Equal(t, expectedShortWfList.Items, mergeWithArchivedWorkflows(liveWfList, archivedWfList, 2).Items)
}

func TestWatchWorkflows(t *testing.T) {
Expand Down
Loading