From 569e6024ec9d56c4333d582a37f4e2369a1a9d23 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Thu, 2 Nov 2023 10:48:35 -0600 Subject: [PATCH] Loop over pages to get requested object count. (#29) Loop over pages to get requested object count --- component/s3storage/s3storage.go | 46 ++++++------ component/s3storage/s3wrappers.go | 121 ++++++++++++++---------------- 2 files changed, 80 insertions(+), 87 deletions(-) diff --git a/component/s3storage/s3storage.go b/component/s3storage/s3storage.go index a2345e083..bc427d0c8 100644 --- a/component/s3storage/s3storage.go +++ b/component/s3storage/s3storage.go @@ -233,46 +233,44 @@ func (s3 *S3Storage) ReadDir(options internal.ReadDirOptions) ([]*internal.ObjAt func (s3 *S3Storage) StreamDir(options internal.StreamDirOptions) ([]*internal.ObjAttr, string, error) { log.Trace("S3Storage::StreamDir : %s, offset %d, count %d", options.Name, options.Offset, options.Count) + objectList := make([]*internal.ObjAttr, 0) path := formatListDirName(options.Name) + var iteration int // = 0 + var marker *string = &options.Token // = nil + var totalEntriesFetched int32 + for totalEntriesFetched < options.Count { + newList, newMarker, err := s3.storage.List(path, marker, options.Count-totalEntriesFetched) + if err != nil { + log.Err("S3Storage::StreamDir : %s Failed to read dir [%s]", options.Name, err) + return objectList, "", err + } + objectList = append(objectList, newList...) + marker = newMarker + iteration++ + totalEntriesFetched += int32(len(objectList)) - newList, newMarker, err := s3.storage.List(path, &options.Token, options.Count) - if err != nil { - log.Err("S3Storage::StreamDir : %s Failed to read dir [%s]", options.Name, err) - return newList, "", err - } - - log.Debug("S3Storage::StreamDir : %s Retrieved %d objects with marker %s", options.Name, len(newList), options.Token) - - if newMarker != nil && *newMarker != "" { - log.Debug("S3Storage::StreamDir : %s next-marker %s", options.Name, *newMarker) - if len(newList) == 0 { - /* In some customer scenario we have seen that newList is empty but marker is not empty - which means backend has not returned any items this time but there are more left. - If we return back this empty list to fuse layer it will assume listing has completed - and will terminate the readdir call. As there are more items left on the server side we - need to retry getting a list here. - */ - log.Warn("S3Storage::StreamDir : %s next-marker %s but current list is empty. Need to retry listing", options.Name, *newMarker) - options.Token = *newMarker - return s3.StreamDir(options) + log.Debug("S3Storage::StreamDir : %s So far retrieved %d objects in %d iterations", options.Name, totalEntriesFetched, iteration) + if marker == nil || *marker == "" { + break } } - if newMarker == nil { + + if marker == nil { blnkStr := "" - newMarker = &blnkStr + marker = &blnkStr } // if path is empty, it means it is the root, relative to the mounted directory if len(path) == 0 { path = "/" } - s3StatsCollector.PushEvents(streamDir, path, map[string]interface{}{count: len(newList)}) + s3StatsCollector.PushEvents(streamDir, path, map[string]interface{}{count: totalEntriesFetched}) // increment streamDir call count s3StatsCollector.UpdateStats(stats_manager.Increment, streamDir, (int64)(1)) - return newList, *newMarker, nil + return objectList, *marker, nil } func (s3 *S3Storage) RenameDir(options internal.RenameDirOptions) error { diff --git a/component/s3storage/s3wrappers.go b/component/s3storage/s3wrappers.go index d21eaac74..a2a481958 100644 --- a/component/s3storage/s3wrappers.go +++ b/component/s3storage/s3wrappers.go @@ -315,8 +315,6 @@ func (cl *Client) List(prefix string, marker *string, count int32) ([]*internal. // Check for an empty path to prevent indexing to [-1] findCommonPrefixes := listPath == "" || listPath[len(listPath)-1] == '/' - // create a map to keep track of all directories - var dirList = make(map[string]bool) var newMarker *string var token *string @@ -339,65 +337,64 @@ func (cl *Client) List(prefix string, marker *string, count int32) ([]*internal. paginator := s3.NewListObjectsV2Paginator(cl.awsS3Client, params) // initialize list to be returned objectAttrList := make([]*internal.ObjAttr, 0) - // fetch and process result pages - - if paginator.HasMorePages() { - output, err := paginator.NextPage(context.Background()) - if err != nil { - log.Err("Client::List : Failed to list objects in bucket %v with prefix %v. Here's why: %v", prefix, bucketName, err) - return objectAttrList, nil, err - } + // fetch and process a single result page + output, err := paginator.NextPage(context.Background()) + if err != nil { + log.Err("Client::List : Failed to list objects in bucket %v with prefix %v. Here's why: %v", prefix, bucketName, err) + return objectAttrList, nil, err + } - if output.IsTruncated { - newMarker = output.NextContinuationToken - } else { - newMarker = nil - } + if output.IsTruncated { + newMarker = output.NextContinuationToken + } else { + newMarker = nil + } - // documentation for this S3 data structure: - // https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/s3@v1.30.2#ListObjectsV2Output - for _, value := range output.Contents { - // push object info into the list - name, isSymLink := cl.getFile(*value.Key) + // documentation for this S3 data structure: + // https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/s3@v1.30.2#ListObjectsV2Output + for _, value := range output.Contents { + // push object info into the list + name, isSymLink := cl.getFile(*value.Key) - path := split(cl.Config.prefixPath, name) - attr := createObjAttr(path, value.Size, *value.LastModified, isSymLink) - objectAttrList = append(objectAttrList, attr) - } + path := split(cl.Config.prefixPath, name) + attr := createObjAttr(path, value.Size, *value.LastModified, isSymLink) + objectAttrList = append(objectAttrList, attr) + } - if findCommonPrefixes { - // documentation for CommonPrefixes: - // https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/s3@v1.30.2/types#CommonPrefix - for _, value := range output.CommonPrefixes { - dir := *value.Prefix - dirList[dir] = true - // let's extract and add intermediate directories - // first cut the listPath (the full prefix path) off of the directory path - _, intermediatePath, listPathFound := strings.Cut(dir, listPath) - // if the listPath isn't here, that's weird - if !listPathFound { - log.Warn("Prefix mismatch with path %v when listing objects in %v.", dir, listPath) + if findCommonPrefixes { + // documentation for CommonPrefixes: + // https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/s3@v1.30.2/types#CommonPrefix + // create a map to keep track of all directories + var dirList = make(map[string]bool) + for _, value := range output.CommonPrefixes { + dir := *value.Prefix + dirList[dir] = true + // let's extract and add intermediate directories + // first cut the listPath (the full prefix path) off of the directory path + _, intermediatePath, listPathFound := strings.Cut(dir, listPath) + // if the listPath isn't here, that's weird + if !listPathFound { + log.Warn("Prefix mismatch with path %v when listing objects in %v.", dir, listPath) + } + // get an array of intermediate directories + intermediateDirectories := strings.Split(intermediatePath, "/") + // walk up the tree and add each one until we find an already existing parent + // we have to iterate in descending order + suffixToTrim := "" + for i := len(intermediateDirectories) - 1; i >= 0; i-- { + // ignore empty strings (split does not omit them) + if intermediateDirectories[i] == "" { + continue } - // get an array of intermediate directories - intermediateDirectories := strings.Split(intermediatePath, "/") - // walk up the tree and add each one until we find an already existing parent - // we have to iterate in descending order - suffixToTrim := "" - for i := len(intermediateDirectories) - 1; i >= 0; i-- { - // ignore empty strings (split does not omit them) - if intermediateDirectories[i] == "" { - continue - } - // add to the suffix we're trimming off - suffixToTrim = intermediateDirectories[i] + "/" + suffixToTrim - // get the trimmed (parent) directory - parentDir := strings.TrimSuffix(dir, suffixToTrim) - // have we seen this one already? - if dirList[parentDir] { - break - } - dirList[parentDir] = true + // add to the suffix we're trimming off + suffixToTrim = intermediateDirectories[i] + "/" + suffixToTrim + // get the trimmed (parent) directory + parentDir := strings.TrimSuffix(dir, suffixToTrim) + // have we seen this one already? + if dirList[parentDir] { + break } + dirList[parentDir] = true } } @@ -411,17 +408,15 @@ func (cl *Client) List(prefix string, marker *string, count int32) ([]*internal. attr := internal.CreateObjAttrDir(path) objectAttrList = append(objectAttrList, attr) } - - // values should be returned in ascending order by key - // sort the list before returning it - sort.Slice(objectAttrList, func(i, j int) bool { - return objectAttrList[i].Path < objectAttrList[j].Path - }) - } - return objectAttrList, newMarker, nil + // values should be returned in ascending order by key + // sort the list before returning it + sort.Slice(objectAttrList, func(i, j int) bool { + return objectAttrList[i].Path < objectAttrList[j].Path + }) + return objectAttrList, newMarker, nil } // create an object attributes struct