Skip to content

Commit

Permalink
Loop over pages to get requested object count. (#29)
Browse files Browse the repository at this point in the history
Loop over pages to get requested object count
  • Loading branch information
foodprocessor authored Nov 2, 2023
1 parent 2bb3c6c commit 569e602
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 87 deletions.
46 changes: 22 additions & 24 deletions component/s3storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,46 +233,44 @@ func (s3 *S3Storage) ReadDir(options internal.ReadDirOptions) ([]*internal.ObjAt

func (s3 *S3Storage) StreamDir(options internal.StreamDirOptions) ([]*internal.ObjAttr, string, error) {
log.Trace("S3Storage::StreamDir : %s, offset %d, count %d", options.Name, options.Offset, options.Count)
objectList := make([]*internal.ObjAttr, 0)

path := formatListDirName(options.Name)
var iteration int // = 0
var marker *string = &options.Token // = nil
var totalEntriesFetched int32
for totalEntriesFetched < options.Count {
newList, newMarker, err := s3.storage.List(path, marker, options.Count-totalEntriesFetched)
if err != nil {
log.Err("S3Storage::StreamDir : %s Failed to read dir [%s]", options.Name, err)
return objectList, "", err
}
objectList = append(objectList, newList...)
marker = newMarker
iteration++
totalEntriesFetched += int32(len(objectList))

newList, newMarker, err := s3.storage.List(path, &options.Token, options.Count)
if err != nil {
log.Err("S3Storage::StreamDir : %s Failed to read dir [%s]", options.Name, err)
return newList, "", err
}

log.Debug("S3Storage::StreamDir : %s Retrieved %d objects with marker %s", options.Name, len(newList), options.Token)

if newMarker != nil && *newMarker != "" {
log.Debug("S3Storage::StreamDir : %s next-marker %s", options.Name, *newMarker)
if len(newList) == 0 {
/* In some customer scenario we have seen that newList is empty but marker is not empty
which means backend has not returned any items this time but there are more left.
If we return back this empty list to fuse layer it will assume listing has completed
and will terminate the readdir call. As there are more items left on the server side we
need to retry getting a list here.
*/
log.Warn("S3Storage::StreamDir : %s next-marker %s but current list is empty. Need to retry listing", options.Name, *newMarker)
options.Token = *newMarker
return s3.StreamDir(options)
log.Debug("S3Storage::StreamDir : %s So far retrieved %d objects in %d iterations", options.Name, totalEntriesFetched, iteration)
if marker == nil || *marker == "" {
break
}
}
if newMarker == nil {

if marker == nil {
blnkStr := ""
newMarker = &blnkStr
marker = &blnkStr
}

// if path is empty, it means it is the root, relative to the mounted directory
if len(path) == 0 {
path = "/"
}
s3StatsCollector.PushEvents(streamDir, path, map[string]interface{}{count: len(newList)})
s3StatsCollector.PushEvents(streamDir, path, map[string]interface{}{count: totalEntriesFetched})

// increment streamDir call count
s3StatsCollector.UpdateStats(stats_manager.Increment, streamDir, (int64)(1))

return newList, *newMarker, nil
return objectList, *marker, nil
}

func (s3 *S3Storage) RenameDir(options internal.RenameDirOptions) error {
Expand Down
121 changes: 58 additions & 63 deletions component/s3storage/s3wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,6 @@ func (cl *Client) List(prefix string, marker *string, count int32) ([]*internal.
// Check for an empty path to prevent indexing to [-1]
findCommonPrefixes := listPath == "" || listPath[len(listPath)-1] == '/'

// create a map to keep track of all directories
var dirList = make(map[string]bool)
var newMarker *string
var token *string

Expand All @@ -339,65 +337,64 @@ func (cl *Client) List(prefix string, marker *string, count int32) ([]*internal.
paginator := s3.NewListObjectsV2Paginator(cl.awsS3Client, params)
// initialize list to be returned
objectAttrList := make([]*internal.ObjAttr, 0)
// fetch and process result pages

if paginator.HasMorePages() {
output, err := paginator.NextPage(context.Background())
if err != nil {
log.Err("Client::List : Failed to list objects in bucket %v with prefix %v. Here's why: %v", prefix, bucketName, err)
return objectAttrList, nil, err
}
// fetch and process a single result page
output, err := paginator.NextPage(context.Background())
if err != nil {
log.Err("Client::List : Failed to list objects in bucket %v with prefix %v. Here's why: %v", prefix, bucketName, err)
return objectAttrList, nil, err
}

if output.IsTruncated {
newMarker = output.NextContinuationToken
} else {
newMarker = nil
}
if output.IsTruncated {
newMarker = output.NextContinuationToken
} else {
newMarker = nil
}

// documentation for this S3 data structure:
// https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/[email protected]#ListObjectsV2Output
for _, value := range output.Contents {
// push object info into the list
name, isSymLink := cl.getFile(*value.Key)
// documentation for this S3 data structure:
// https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/[email protected]#ListObjectsV2Output
for _, value := range output.Contents {
// push object info into the list
name, isSymLink := cl.getFile(*value.Key)

path := split(cl.Config.prefixPath, name)
attr := createObjAttr(path, value.Size, *value.LastModified, isSymLink)
objectAttrList = append(objectAttrList, attr)
}
path := split(cl.Config.prefixPath, name)
attr := createObjAttr(path, value.Size, *value.LastModified, isSymLink)
objectAttrList = append(objectAttrList, attr)
}

if findCommonPrefixes {
// documentation for CommonPrefixes:
// https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/[email protected]/types#CommonPrefix
for _, value := range output.CommonPrefixes {
dir := *value.Prefix
dirList[dir] = true
// let's extract and add intermediate directories
// first cut the listPath (the full prefix path) off of the directory path
_, intermediatePath, listPathFound := strings.Cut(dir, listPath)
// if the listPath isn't here, that's weird
if !listPathFound {
log.Warn("Prefix mismatch with path %v when listing objects in %v.", dir, listPath)
if findCommonPrefixes {
// documentation for CommonPrefixes:
// https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/service/[email protected]/types#CommonPrefix
// create a map to keep track of all directories
var dirList = make(map[string]bool)
for _, value := range output.CommonPrefixes {
dir := *value.Prefix
dirList[dir] = true
// let's extract and add intermediate directories
// first cut the listPath (the full prefix path) off of the directory path
_, intermediatePath, listPathFound := strings.Cut(dir, listPath)
// if the listPath isn't here, that's weird
if !listPathFound {
log.Warn("Prefix mismatch with path %v when listing objects in %v.", dir, listPath)
}
// get an array of intermediate directories
intermediateDirectories := strings.Split(intermediatePath, "/")
// walk up the tree and add each one until we find an already existing parent
// we have to iterate in descending order
suffixToTrim := ""
for i := len(intermediateDirectories) - 1; i >= 0; i-- {
// ignore empty strings (split does not omit them)
if intermediateDirectories[i] == "" {
continue
}
// get an array of intermediate directories
intermediateDirectories := strings.Split(intermediatePath, "/")
// walk up the tree and add each one until we find an already existing parent
// we have to iterate in descending order
suffixToTrim := ""
for i := len(intermediateDirectories) - 1; i >= 0; i-- {
// ignore empty strings (split does not omit them)
if intermediateDirectories[i] == "" {
continue
}
// add to the suffix we're trimming off
suffixToTrim = intermediateDirectories[i] + "/" + suffixToTrim
// get the trimmed (parent) directory
parentDir := strings.TrimSuffix(dir, suffixToTrim)
// have we seen this one already?
if dirList[parentDir] {
break
}
dirList[parentDir] = true
// add to the suffix we're trimming off
suffixToTrim = intermediateDirectories[i] + "/" + suffixToTrim
// get the trimmed (parent) directory
parentDir := strings.TrimSuffix(dir, suffixToTrim)
// have we seen this one already?
if dirList[parentDir] {
break
}
dirList[parentDir] = true
}
}

Expand All @@ -411,17 +408,15 @@ func (cl *Client) List(prefix string, marker *string, count int32) ([]*internal.
attr := internal.CreateObjAttrDir(path)
objectAttrList = append(objectAttrList, attr)
}

// values should be returned in ascending order by key
// sort the list before returning it
sort.Slice(objectAttrList, func(i, j int) bool {
return objectAttrList[i].Path < objectAttrList[j].Path
})

}

return objectAttrList, newMarker, nil
// values should be returned in ascending order by key
// sort the list before returning it
sort.Slice(objectAttrList, func(i, j int) bool {
return objectAttrList[i].Path < objectAttrList[j].Path
})

return objectAttrList, newMarker, nil
}

// create an object attributes struct
Expand Down

0 comments on commit 569e602

Please sign in to comment.