Skip to content

Commit

Permalink
feat(storage/dataflux): run worksteal listing parallel to sequential …
Browse files Browse the repository at this point in the history
…listing (#10966)
  • Loading branch information
akansha1812 authored Nov 25, 2024
1 parent ebf3657 commit 3005f5a
Show file tree
Hide file tree
Showing 12 changed files with 692 additions and 193 deletions.
5 changes: 2 additions & 3 deletions storage/dataflux/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"google.golang.org/api/iterator"
)

func ExampleNextBatch_batch() {
func ExampleLister() {
ctx := context.Background()
// Pass in any client opts or set retry policy here.
client, err := storage.NewClient(ctx)
Expand All @@ -42,8 +42,7 @@ func ExampleNextBatch_batch() {
SkipDirectoryObjects: false,
}

// Create Lister with desired options, including number of workers,
// part size, per operation timeout, etc.
// Create Lister with fast-list input.
df := dataflux.NewLister(client, in)
defer df.Close()

Expand Down
237 changes: 155 additions & 82 deletions storage/dataflux/fast_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"runtime"
"strings"
"sync"

"cloud.google.com/go/storage"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -47,9 +48,11 @@ type ListerInput struct {
// Default value is 10x number of available CPU. Optional.
Parallelism int

// BatchSize is the number of objects to list. Default value returns
// all objects at once. The number of objects returned will be
// rounded up to a multiple of gcs page size. Optional.
// BatchSize is the minimum number of objects to list in each batch.
// The number of objects returned in a batch will be rounded up to
// include all the objects received in the last request to GCS.
// By default, the Lister returns all objects in one batch.
// Optional.
BatchSize int

// Query is the query to filter objects for listing. Default value is nil.
Expand All @@ -58,10 +61,40 @@ type ListerInput struct {
Query storage.Query

// SkipDirectoryObjects is to indicate whether to list directory objects.
// Default value is false. Optional.
// Note: Even if directory objects are excluded, they contribute to the
// [ListerInput.BatchSize] count. Default value is false. Optional.
SkipDirectoryObjects bool
}

// NewLister creates a new [Lister] that can be used to list objects in the given bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)

// If parallelism is not given, set default value to 10x the number of
// available CPU.
if in.Parallelism == 0 {
in.Parallelism = runtime.NumCPU() * 10
}
// Initialize range channel with entire namespace of object for given
// prefix, startoffset and endoffset. For the default range to list is
// entire namespace, start and end will be empty.
rangeChannel := make(chan *listRange, in.Parallelism*2)
start, end := prefixAdjustedOffsets(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
rangeChannel <- &listRange{startRange: start, endRange: end}

lister := &Lister{
method: open,
parallelism: in.Parallelism,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
ranges: rangeChannel,
}
return lister
}

// Lister is used for interacting with Dataflux fast-listing. The caller should
// initialize it with NewLister() instead of creating it directly.
type Lister struct {
Expand Down Expand Up @@ -92,116 +125,156 @@ type Lister struct {
skipDirectoryObjects bool
}

// NewLister creates a new dataflux Lister to list objects in the give bucket.
func NewLister(c *storage.Client, in *ListerInput) *Lister {
bucket := c.Bucket(in.BucketName)

// If parallelism is not given, set default value to 10x the number of
// available CPU.
if in.Parallelism == 0 {
in.Parallelism = runtime.NumCPU() * 10
}
// Initialize range channel with entire namespace of object for given
// prefix, startoffset and endoffset. For the default range to list is
// entire namespace, start and end will be empty.
rangeChannel := make(chan *listRange, in.Parallelism*2)
start, end := updateStartEndOffset(in.Query.StartOffset, in.Query.EndOffset, in.Query.Prefix)
rangeChannel <- &listRange{startRange: start, endRange: end}

lister := &Lister{
method: open,
parallelism: in.Parallelism,
pageToken: "",
bucket: bucket,
batchSize: in.BatchSize,
query: in.Query,
skipDirectoryObjects: in.SkipDirectoryObjects,
ranges: rangeChannel,
}
return lister
}

// NextBatch runs worksteal algorithm and sequential listing in parallel to quickly
// return a list of objects in the bucket. For smaller dataset,
// sequential listing is expected to be faster. For larger dataset,
// NextBatch returns the next N objects in the bucket, where N is [ListerInput.BatchSize].
// In case of failure, all processes are stopped and an error is returned immediately. Create a new Lister to retry.
// For the first batch, both worksteal listing and sequential
// listing runs in parallel to quickly list N number of objects in the bucket. For subsequent
// batches, only the method which returned object faster in the first batch is used.
// For smaller dataset, sequential listing is expected to be faster. For larger dataset,
// worksteal listing is expected to be faster.
//
// Worksteal algorithm list objects in GCS bucket in parallel using multiple parallel
// workers and each worker in the list operation is able to steal work from its siblings
// once it has finished all currently slated listing work.
func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) {
// countError tracks the number of failed listing methods.
countError := 0
var results []*storage.ObjectAttrs
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of
// the method is complete, the running method also stops.
g, childCtx := errgroup.WithContext(ctx)

// To start listing method is Open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent runs.

// TODO: Run worksteal listing when method is Open or WorkSteal.
var results []*storage.ObjectAttrs

// Run sequential listing when method is Open or Sequential.
if c.method != worksteal {
// For the first batch, listing method is open and runs both worksteal and sequential listing
// in parallel. The method which completes first is used for all subsequent NextBatch calls.
switch c.method {
case worksteal:
// Run worksteal algorithm for listing.
objects, err := c.workstealListing(ctx)
if err != nil {
return nil, fmt.Errorf("worksteal listing: %w", err)
}
results = objects
case sequential:
// Run GCS sequential listing.
objects, token, err := c.sequentialListing(ctx)
if err != nil {
return nil, fmt.Errorf("sequential listing: %w", err)
}
results = objects
c.pageToken = token
c.ranges = nil
case open:
// countError tracks the number of failed listing methods.
countErr := &countErr{counter: 0}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Errgroup takes care of running both methods in parallel. As soon as one of
// the method is complete, the running method also stops.
g, ctx := errgroup.WithContext(ctx)
wsCompletedfirst := false
seqCompletedfirst := false
var wsObjects []*storage.ObjectAttrs
var seqObjects []*storage.ObjectAttrs
var nextToken string
g.Go(func() error {
objects, err := c.workstealListing(ctx)
if err != nil {
countErr.increment()
return fmt.Errorf("worksteal listing: %w", err)
}
// Close context when worksteal listing is complete.
cancel()
wsCompletedfirst = true
wsObjects = objects

return nil
})
g.Go(func() error {
objects, nextToken, err := c.sequentialListing(childCtx)
objects, token, err := c.sequentialListing(ctx)
if err != nil {
countError++
return fmt.Errorf("error in running sequential listing: %w", err)
countErr.increment()
return fmt.Errorf("sequential listing: %w", err)
}
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = objects
c.pageToken = nextToken
c.method = sequential
// Close context when sequential listing is complete.
cancel()
seqCompletedfirst = true
seqObjects = objects
nextToken = token

return nil
})
}

// Close all functions if either sequential listing or worksteal listing is complete.
err := g.Wait()

// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context for the
// only then return error. other method. If both sequential and worksteal listing
// fail due to context canceled, return error.
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
// Close all functions if either sequential listing or worksteal listing is complete.
err := g.Wait()

// If the error is not context.Canceled, then return error instead of falling back
// to the other method. This is so that the error can be fixed and user can take
// advantage of fast-listing.
// As one of the listing method completes, it is expected to cancel context and
// return context canceled error for the other method. Since context canceled is expected, it
// will not be considered an error. If both sequential and worksteal listing fail due
// to context canceled, then return error.
if err != nil && (!errors.Is(err, context.Canceled) || countErr.counter > 1) {
return nil, fmt.Errorf("dataflux: %w", err)
}
if wsCompletedfirst {
// If worksteal listing completes first, set method to worksteal listing and nextToken to "".
// The c.ranges channel will be used to continue worksteal listing.
results = wsObjects
c.pageToken = ""
c.method = worksteal
} else if seqCompletedfirst {
// If sequential listing completes first, set method to sequential listing
// and ranges to nil. The nextToken will be used to continue sequential listing.
results = seqObjects
c.pageToken = nextToken
c.method = sequential
c.ranges = nil
}
}

// If ranges for worksteal and pageToken for sequential listing is empty, then
// listing is complete.
if c.pageToken == "" {
if c.pageToken == "" && len(c.ranges) == 0 {
return results, iterator.Done
}
return results, nil
}

// Close closes the range channel of the Lister.
// Close is used to close the Lister.
func (c *Lister) Close() {
if c.ranges != nil {
close(c.ranges)
}
}

// updateStartEndOffset updates start and end offset based on prefix.
// If a prefix is given, adjust start and end value such that it lists
// objects with the given prefix. updateStartEndOffset assumes prefix will
// be added to the object name while listing objects in worksteal algorithm.
type countErr struct {
mu sync.Mutex
counter int
}

func (cc *countErr) increment() {
cc.mu.Lock()
defer cc.mu.Unlock()
cc.counter++
}

// prefixAdjustedOffsets returns a start and end offset adjusted from the given offsets based on the prefix, stripping the prefix.
// These offsets can be used by adding back the prefix, so that the original offsets do not need to be checked.

// This means that if the given offsets are out of range of the prefix
// (for example, offsets {start:"a", end: "b"}, with prefix "c" which is lexicographically
// outside of "a" to "b"), the returned offsets will ensure no strings fall in their range.

// Otherwise, if the offset is too permissive given the prefix, it returns an empty string
// to indicate there is no offset and all objects starting from or ending at the prefix should
// be listed.
//
// For example:
// start = "abc", end = "prefix_a", prefix = "prefix",
//
// end will change to "_a", prefix will be added in worksteal algorithm.
// "abc" is lexicographically smaller than "prefix". So start will be the first
// object with the given prefix.
// "abc" is lexicographically smaller than "prefix". The start offset indicates first
//
// Therefore start will change to ""(empty string) and end to "_a" .
func updateStartEndOffset(start, end, prefix string) (string, string) {
// object with the given prefix should be listed therefor start offset will be empty.
// The end offset will change to "_a" as the prefix is stripped.
// Therefore new offset will change to {start = "", end = "_a" }.
func prefixAdjustedOffsets(start, end, prefix string) (string, string) {
if prefix == "" {
return start, end
}
Expand Down
Loading

0 comments on commit 3005f5a

Please sign in to comment.