diff --git a/helpers/getTransfernator.go b/helpers/getTransfernator.go index a02d2dd..13bfc4e 100644 --- a/helpers/getTransfernator.go +++ b/helpers/getTransfernator.go @@ -56,7 +56,7 @@ func newBulkGetRequest(bucketName string, readObjects *[]helperModels.GetObject, func createPartialGetObjects(getObject helperModels.GetObject) []ds3Models.Ds3GetObject { // handle getting the entire object if len(getObject.Ranges) == 0 { - return []ds3Models.Ds3GetObject { { Name:getObject.Name }, } + return []ds3Models.Ds3GetObject { { Name:getObject.Name } } } // handle partial object retrieval var partialObjects []ds3Models.Ds3GetObject @@ -123,11 +123,10 @@ func (transceiver *getTransceiver) transfer() (string, error) { consumer := newConsumer(&queue, &waitGroup, transceiver.Strategy.BlobStrategy.maxConcurrentTransfers(), doneNotifier) // Wait for completion of producer-consumer goroutines - var aggErr ds3Models.AggregateError waitGroup.Add(1) // adding producer and consumer goroutines to wait group go consumer.run() err = producer.run() // producer will add to waitGroup for every blob retrieval added to queue, and each transfer performed will decrement from waitGroup waitGroup.Wait() - return bulkGetResponse.MasterObjectList.JobId, aggErr.GetErrors() + return bulkGetResponse.MasterObjectList.JobId, nil } \ No newline at end of file diff --git a/helpers/helpersImpl.go b/helpers/helpersImpl.go index e6f28a2..a7928fe 100644 --- a/helpers/helpersImpl.go +++ b/helpers/helpersImpl.go @@ -1,8 +1,14 @@ package helpers import ( + "fmt" "github.com/SpectraLogic/ds3_go_sdk/ds3" + "github.com/SpectraLogic/ds3_go_sdk/ds3/models" helperModels "github.com/SpectraLogic/ds3_go_sdk/helpers/models" + "github.com/SpectraLogic/ds3_go_sdk/helpers/ranges" + "net/http" + "sort" + "strings" ) type HelperInterface interface { @@ -20,6 +26,12 @@ type HelperInterface interface { // A job ID will be returned if a BP job was successfully created, regardless of // whether additional errors occur. GetObjects(bucketName string, objects []helperModels.GetObject, strategy ReadTransferStrategy) (string, error) + + // Retrieves the list of objects from the specified bucket on the Black Pearl. + // If a get job cannot be created due to insufficient cache space to fulfill an + // IN_ORDER processing guarantee, then the job is split across multiple BP jobs. + // This allows for the IN_ORDER retrieval of objects that exceed available cache space. + GetObjectsSpanningJobs(bucketName string, objects []helperModels.GetObject, strategy ReadTransferStrategy) ([]string, error) } type HelperImpl struct { @@ -49,3 +61,118 @@ func (helper *HelperImpl) GetObjects(bucketName string, objects []helperModels.G transceiver := newGetTransceiver(bucketName, &objects, &strategy, helper.client) return transceiver.transfer() } + +func (helper *HelperImpl) GetObjectsSpanningJobs(bucketName string, objects []helperModels.GetObject, strategy ReadTransferStrategy) ([]string, error) { + // Attempt to send the entire job at once + jobId, err := helper.GetObjects(bucketName, objects, strategy) + if err == nil { + // success + return []string{jobId}, nil + } else if !helper.isCannotPreAllocateError(err) { + // error not related to pre-allocation + return nil, err + } + + // Retrieve each file individually + var jobIds []string + for _, getObject := range objects { + fileJobIds := helper.retrieveIndividualFile(bucketName, getObject, strategy) + jobIds = append(jobIds, fileJobIds...) + } + return jobIds, nil +} + +func (helper *HelperImpl) isCannotPreAllocateError(err error) bool { + badStatusErr, ok := err.(*models.BadStatusCodeError) + if !ok || badStatusErr.ActualStatusCode != http.StatusServiceUnavailable { + // failed to create bulk get for reason other than 503 + return false + } + + if strings.Contains(badStatusErr.Error(), "GET jobs that have a chunkClientProcessingOrderGuarantee of IN_ORDER must be entirely pre-allocated. Cannot pre-allocate") { + return true + } + return false +} + +func (helper *HelperImpl) retrieveIndividualFile(bucketName string, getObject helperModels.GetObject, strategy ReadTransferStrategy) []string { + // Get the blob offsets + headObject, err := helper.client.HeadObject(models.NewHeadObjectRequest(bucketName, getObject.Name)) + if err != nil { + getObject.ChannelBuilder.SetFatalError(err) + return nil + } + var offsets []int64 + for offset := range headObject.BlobChecksums { + offsets = append(offsets, offset) + } + + sort.Slice(offsets, func(i, j int) bool { + return offsets[i] < offsets[j] + }) + + // Get the object size + objectsDetails, err := helper.client.GetObjectsWithFullDetailsSpectraS3( + models.NewGetObjectsWithFullDetailsSpectraS3Request(). + WithBucketId(bucketName).WithName(getObject.Name). + WithLatest(true)) + + if err != nil { + getObject.ChannelBuilder.SetFatalError(err) + return nil + } else if len(objectsDetails.DetailedS3ObjectList.DetailedS3Objects) < 1 { + getObject.ChannelBuilder.SetFatalError(fmt.Errorf("failed to get object details")) + return nil + } + + // Retrieve the object one blob at a time in order + objectCopy := getObject + objectEnd := objectsDetails.DetailedS3ObjectList.DetailedS3Objects[0].Size - 1 + if len(objectCopy.Ranges) == 0 { + // If the user didn't specify a range, add a range that covers the entire file + // so that we can use the blobRangeFinder to tell us what ranges to specify. + objectCopy.Ranges = append(objectCopy.Ranges, models.Range{Start: 0, End: objectEnd}) + } + blobFinder := ranges.NewBlobRangeFinder(&[]helperModels.GetObject{objectCopy}) + + var jobIds []string + for i, offset := range offsets { + var blobEnd int64 + if i+1 < len(offsets) { + blobEnd = offsets[i+1]-1 + } else { + blobEnd = objectEnd + } + length := blobEnd - offset + 1 + blobRanges := blobFinder.GetRanges(objectCopy.Name, offset, length) + if len(blobRanges) == 0 { + // This blob does not need to be retrieved + continue + } + + jobId, err := helper.retrieveBlob(bucketName, getObject, blobRanges, strategy) + if err != nil { + getObject.ChannelBuilder.SetFatalError(err) + return nil + } + jobIds = append(jobIds, jobId) + + if objectCopy.ChannelBuilder.HasFatalError() { + // Failed to retrieve a portion of the file, don't bother with the rest + break + } + } + return jobIds +} + +func (helper *HelperImpl) retrieveBlob(bucketName string, getObject helperModels.GetObject, blobRanges []models.Range, strategy ReadTransferStrategy) (string, error) { + // Since there is only one blob being retrieved, create the job with Order-Guarantee=None so that the + // job will wait if cache needs to be reclaimed on the BP before the chunk can be allocated. + getObjectBlob := getObject + getObjectBlob.Ranges = blobRanges + + strategyCopy := strategy + strategyCopy.Options.ChunkClientProcessingOrderGuarantee = models.JOB_CHUNK_CLIENT_PROCESSING_ORDER_GUARANTEE_NONE + + return helper.GetObjects(bucketName, []helperModels.GetObject{getObjectBlob}, strategyCopy) +} diff --git a/helpers/helpersImpl_test.go b/helpers/helpersImpl_test.go new file mode 100644 index 0000000..6cbfab8 --- /dev/null +++ b/helpers/helpersImpl_test.go @@ -0,0 +1,284 @@ +package helpers + +import ( + "crypto/rand" + "crypto/sha256" + "fmt" + "github.com/SpectraLogic/ds3_go_sdk/ds3/buildclient" + ds3Models "github.com/SpectraLogic/ds3_go_sdk/ds3/models" + "github.com/SpectraLogic/ds3_go_sdk/ds3_utils/ds3Testing" + "github.com/SpectraLogic/ds3_go_sdk/helpers/channels" + "github.com/SpectraLogic/ds3_go_sdk/helpers/models" + "hash" + "io" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" +) + +type randomDataReader struct { + curRead int + maxRead int + mutex sync.Mutex + totalFileHash hash.Hash + rangesHash hash.Hash + ranges []ds3Models.Range +} + +func (reader *randomDataReader) Read(p []byte) (int, error) { + reader.mutex.Lock() + defer reader.mutex.Unlock() + + if reader.totalFileHash == nil { + reader.totalFileHash = sha256.New() + } + if reader.rangesHash == nil { + reader.rangesHash = sha256.New() + } + + if reader.curRead >= reader.maxRead { + return 0, io.EOF + } + amountToRead := reader.maxRead - reader.curRead + if amountToRead > len(p) { + amountToRead = len(p) + } + + buff := make([]byte, amountToRead) + _, _ = rand.Read(buff) + reader.totalFileHash.Write(buff) + for i := 0; i < len(buff); i++ { + p[i] = buff[i] + + // calculate the hash on specified ranges + curByteOffset := int64(reader.curRead + i) + + for _, curRange := range reader.ranges { + if curByteOffset >= curRange.Start && curByteOffset <= curRange.End { + reader.rangesHash.Write([]byte{buff[i]}) + break + } + } + } + + reader.curRead += amountToRead + return amountToRead, nil +} + +func (reader *randomDataReader) Close() error { + return nil +} + +type randomDataReadChannelBuilder struct { + randomDataReader *randomDataReader + channels.FatalErrorHandler +} + +func (builder *randomDataReadChannelBuilder) IsChannelAvailable(offset int64) bool { + return int64(builder.randomDataReader.curRead) == offset +} + +func (builder *randomDataReadChannelBuilder) GetChannel(_ int64) (io.ReadCloser, error) { + return builder.randomDataReader, nil +} + +func (builder *randomDataReadChannelBuilder) OnDone(reader io.ReadCloser) { + _ = reader.Close() +} + +type consumeWriter struct { + size int64 + hash hash.Hash + mutex sync.Mutex +} + +func (writer *consumeWriter) Write(p []byte) (n int, err error) { + writer.mutex.Lock() + defer writer.mutex.Unlock() + + if writer.hash == nil { + writer.hash = sha256.New() + } + atomic.AddInt64(&writer.size, int64(len(p))) + writer.hash.Write(p) + return len(p), err +} + +func (writer *consumeWriter) Close() error { + return nil +} + +type consumerWriteChannelBuilder struct { + writer *consumeWriter + channels.FatalErrorHandler +} + +func (builder *consumerWriteChannelBuilder) IsChannelAvailable(_ int64) bool { + return true +} + +func (builder *consumerWriteChannelBuilder) GetChannel(_ int64) (io.WriteCloser, error) { + return builder.writer, nil +} + +func (builder *consumerWriteChannelBuilder) OnDone(writer io.WriteCloser) { + _ = writer.Close() +} + +func TestRetrievingObjectLargerThanCacheInOrder(t *testing.T) { + t.Skip("Test reduces BP cache size to 1 GB and does not reset it. Best run against the BP simulator.") + + const minCache = 1073741824 + const objectSize = minCache + 100 + + testRanges := []ds3Models.Range{{Start: 2, End: 10}, {Start: 30, End: 40}, {Start: 50, End: 60}, {Start: 80, End: objectSize-1}} + + bucketName := fmt.Sprintf("testBucket-%d", time.Now().UnixNano()) + objectName := fmt.Sprintf("testObject-%d", time.Now().UnixNano()) + + simClient, err := buildclient.FromEnv() + ds3Testing.AssertNilError(t, err) + + // Ensure the cache is set to minimum of 1 GB + getCacheResp, err := simClient.GetCacheFilesystemsSpectraS3(ds3Models.NewGetCacheFilesystemsSpectraS3Request()) + ds3Testing.AssertNilError(t, err) + + t.Logf("CACHE:") + for i, curCache := range getCacheResp.CacheFilesystemList.CacheFilesystems { + t.Logf("%d) %s, (%v)", i, curCache.Id, curCache.MaxCapacityInBytes) + + req := ds3Models.NewModifyCacheFilesystemSpectraS3Request(curCache.Id).WithMaxCapacityInBytes(minCache) + modifyResp, err := simClient.ModifyCacheFilesystemSpectraS3(req) + ds3Testing.AssertNilError(t, err) + t.Logf("modified: %s, (%v)", modifyResp.CacheFilesystem.Id, modifyResp.CacheFilesystem.MaxCapacityInBytes) + } + + // Make sure that our test bucket exists with a file in it + _, err = simClient.GetBucketSpectraS3(ds3Models.NewGetBucketSpectraS3Request(bucketName)) + if err != nil { + t.Logf("Creating bucket: %s", bucketName) + _, err := simClient.PutBucket(ds3Models.NewPutBucketRequest(bucketName)) + ds3Testing.AssertNilError(t, err) + + defer func() { + _, err := simClient.DeleteBucketSpectraS3(ds3Models.NewDeleteBucketSpectraS3Request(bucketName).WithForce()) + if err != nil { + t.Errorf("failed to delete bucket with force '%s': %v", bucketName, err) + } + }() + } + + // Create the object + t.Logf("Creating object: %s", objectName) + + helperWrapper := NewHelpers(simClient) + + objReader := randomDataReader{ + maxRead: objectSize, + ranges: testRanges, + } + + putObject := models.PutObject{ + PutObject: ds3Models.Ds3PutObject{ + Name: objectName, + Size: int64(objectSize), + }, + ChannelBuilder: &randomDataReadChannelBuilder{ + randomDataReader: &objReader, + FatalErrorHandler: channels.FatalErrorHandler{}, + }, + } + + writeStrategy := WriteTransferStrategy{ + BlobStrategy: &SimpleBlobStrategy{ + Delay: time.Millisecond * 10, + MaxConcurrentTransfers: 5, + MaxWaitingTransfers: 1, + }, + Options: WriteBulkJobOptions{MaxUploadSize: &MinUploadSize}, + Listeners: ListenerStrategy{ + ErrorCallback: func(objectName string, err error) { + t.Errorf("unexpected error on '%s': %v", objectName, err) + }, + }, + } + + _, err = helperWrapper.PutObjects(bucketName, []models.PutObject{putObject}, writeStrategy) + ds3Testing.AssertNilError(t, err) + + defer func() { + _, err := simClient.DeleteObject(ds3Models.NewDeleteObjectRequest(bucketName, objectName)) + if err != nil { + t.Errorf("failed to delete object %s: %v", objectName, err) + } + }() + + testCases := []struct{ + name string + ranges []ds3Models.Range + } { + { name: "retrieve entire file", ranges: nil }, + { name: "retrieve ranges", ranges: testRanges}, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + // reclaim cache since simulator doesn't seem to do it on its own + _, err := simClient.ForceFullCacheReclaimSpectraS3(ds3Models.NewForceFullCacheReclaimSpectraS3Request()) + ds3Testing.AssertNilError(t, err) + + readStrategy := ReadTransferStrategy{ + BlobStrategy: &SimpleBlobStrategy{ + Delay: time.Millisecond * 10, + MaxConcurrentTransfers: 5, + MaxWaitingTransfers: 1, + }, + Options: ReadBulkJobOptions{ChunkClientProcessingOrderGuarantee: ds3Models.JOB_CHUNK_CLIENT_PROCESSING_ORDER_GUARANTEE_IN_ORDER}, + Listeners: ListenerStrategy{ + ErrorCallback: func(objectName string, err error) { + t.Errorf("unexpected error on '%s': %v", objectName, err) + }, + }, + } + + objWriter := consumeWriter{} + getObject := models.GetObject{ + Name: objectName, + Ranges: testCase.ranges, + ChannelBuilder: &consumerWriteChannelBuilder{ + writer: &objWriter, + FatalErrorHandler: channels.FatalErrorHandler{}, + }, + } + getJobIds, err := helperWrapper.GetObjectsSpanningJobs(bucketName, []models.GetObject{getObject}, readStrategy) + ds3Testing.AssertNilError(t, err) + + // verify amount of data retrieved + if len(testCase.ranges) == 0 { + ds3Testing.AssertInt64(t, "bytes retrieved", objectSize, objWriter.size) + } else { + expectedDataRetrieved := int64(0) + for _, curRange := range testCase.ranges { + expectedDataRetrieved += curRange.End - curRange.Start + 1 + } + ds3Testing.AssertInt64(t, "bytes retrieved", expectedDataRetrieved, objWriter.size) + } + + // verify retrieved data hash + var readHash []byte + writeHash := objWriter.hash.Sum(nil) + if len(testCase.ranges) == 0 { + readHash = objReader.totalFileHash.Sum(nil) + } else { + readHash = objReader.rangesHash.Sum(nil) + } + ds3Testing.AssertBool(t, "hash matches", true, reflect.DeepEqual(readHash, writeHash)) + ds3Testing.AssertBool(t, "multiple job ids returned", true, len(getJobIds) > 1) + t.Logf("Get Job: %v", getJobIds) + + ds3Testing.AssertBool(t, "has fatal err", false, getObject.ChannelBuilder.HasFatalError()) + }) + } +} diff --git a/helpers/ranges/rangeHelper.go b/helpers/ranges/rangeHelper.go index bc0eacc..01e2380 100644 --- a/helpers/ranges/rangeHelper.go +++ b/helpers/ranges/rangeHelper.go @@ -101,7 +101,7 @@ func (rangeFinder *BlobRangeFinderImpl) GetRanges(name string, offset int64, len objectRanges := rangeFinder.rangeMap[name] // get the subset of ranges that intersect the blob boundary - blobRanges := []ds3Models.Range{} + var blobRanges []ds3Models.Range for _, r := range objectRanges { if rangeFinder.isIntersection(r, offset, length) { blobRanges = append(blobRanges, rangeFinder.getIntersection(r, offset, length))