From 07999a700ebffcda055c135fdb75b5a8e2284127 Mon Sep 17 00:00:00 2001 From: Rachel Tucker Date: Fri, 26 Jun 2020 16:00:50 -0600 Subject: [PATCH] OTHER: adding retry around bulk get when a blob only partially transfers. If the blob is not retrieved in its entirety, then ranged naked gets are used to retrieve the remaining portion of the blob. This retry only applies to blobs where no range was specified. --- ds3_integration/utils/testUtils.go | 4 +- helpers/getProducer.go | 59 +++++++++++- helpers_integration/helpersImpl_test.go | 117 ++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 6 deletions(-) diff --git a/ds3_integration/utils/testUtils.go b/ds3_integration/utils/testUtils.go index 5b7c388..a30b521 100644 --- a/ds3_integration/utils/testUtils.go +++ b/ds3_integration/utils/testUtils.go @@ -60,7 +60,7 @@ func VerifyBookContent(t *testing.T, bookName string, actual io.ReadCloser) { verifyContent(t, expected, actual) } -func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.ReadCloser) { +func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.Reader) { f, err := os.Open(filePath) ds3Testing.AssertNilError(t, err) @@ -73,7 +73,7 @@ func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64 verifyPartialContent(t, *expected, actual, length) } -func verifyPartialContent(t *testing.T, expected []byte, actual io.ReadCloser, length int64) { +func verifyPartialContent(t *testing.T, expected []byte, actual io.Reader, length int64) { content, err := getNBytesFromReader(actual, length) ds3Testing.AssertNilError(t, err) diff --git a/helpers/getProducer.go b/helpers/getProducer.go index 0591b2f..bb4d31b 100644 --- a/helpers/getProducer.go +++ b/helpers/getProducer.go @@ -12,6 +12,8 @@ import ( "time" ) +const timesToRetryGettingPartialBlob = 5 + type getProducer struct { JobMasterObjectList *ds3Models.MasterObjectList //MOL from put bulk job creation GetObjects *[]helperModels.GetObject @@ -146,10 +148,13 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf return } if bytesWritten != info.blob.Length() { - err = fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length()) - producer.strategy.Listeners.Errored(info.blob.Name(), err) - info.channelBuilder.SetFatalError(err) - producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) + producer.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length()) + err := GetRemainingBlob(producer.client, info.bucketName, info.blob, bytesWritten, writer, producer.Logger) + if err != nil { + producer.strategy.Listeners.Errored(info.blob.Name(), err) + info.channelBuilder.SetFatalError(err) + producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error()) + } } return } @@ -166,6 +171,52 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf } } +func GetRemainingBlob(client *ds3.Client, bucketName string, blob *helperModels.BlobDescription, amountAlreadyRetrieved int64, writer io.Writer, logger sdk_log.Logger) error { + logger.Debugf("starting retry for fetching partial blob '%s' at offset '%d': amount to retrieve %d", blob.Name(), blob.Offset(), blob.Length() - amountAlreadyRetrieved) + bytesRetrievedSoFar := amountAlreadyRetrieved + timesRetried := 0 + rangeEnd := blob.Offset() + blob.Length() -1 + for bytesRetrievedSoFar < blob.Length() && timesRetried < timesToRetryGettingPartialBlob { + rangeStart := blob.Offset() + bytesRetrievedSoFar + bytesRetrievedThisRound, err := RetryGettingBlobRange(client, bucketName, blob.Name(), blob.Offset(), rangeStart, rangeEnd, writer, logger) + if err != nil { + logger.Errorf("failed to get object '%s' at offset '%d', range %d=%d attempt %d: %s", blob.Name(), blob.Offset(), rangeStart, rangeEnd, timesRetried, err.Error()) + } + bytesRetrievedSoFar+= bytesRetrievedThisRound + timesRetried++ + } + + if bytesRetrievedSoFar < blob.Length() { + return fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", blob.Name(), blob.Offset(), bytesRetrievedSoFar, blob.Length()) + } + return nil +} + +func RetryGettingBlobRange(client *ds3.Client, bucketName string, objectName string, blobOffset int64, rangeStart int64, rangeEnd int64, writer io.Writer, logger sdk_log.Logger) (int64, error) { + // perform a naked get call for the rest of the blob that we originally failed to get + partOfBlobToFetch := ds3Models.Range{ + Start: rangeStart, + End: rangeEnd, + } + getObjRequest := ds3Models.NewGetObjectRequest(bucketName, objectName). + WithOffset(blobOffset). + WithRanges(partOfBlobToFetch) + + getObjResponse, err := client.GetObject(getObjRequest) + if err != nil { + return 0, err + } + defer func() { + err := getObjResponse.Content.Close() + if err != nil { + logger.Warningf("failed to close response body for get object '%s' with range %d-%d: %v", objectName, rangeStart, rangeEnd, err) + } + }() + + bytesWritten, err := io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer + return bytesWritten, err +} + // Writes a range of a blob to its destination channel func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, blobRange ds3Models.Range, content io.Reader) error { writer, err := channelBuilder.GetChannel(blobRange.Start) diff --git a/helpers_integration/helpersImpl_test.go b/helpers_integration/helpersImpl_test.go index d93155c..65abf88 100644 --- a/helpers_integration/helpersImpl_test.go +++ b/helpers_integration/helpersImpl_test.go @@ -504,3 +504,120 @@ func TestBulkPutAndGetLotsOfFiles(t *testing.T) { t.Errorf("expected to get a BP job ID, but instead got nothing") } } + +func TestRetryGettingBlobRange(t *testing.T) { + defer testutils.DeleteBucketContents(client, testBucket) + + helper := helpers.NewHelpers(client) + strategy := newTestTransferStrategy(t) + + // Put a blobbed object to BP + const bigFilePath = LargeBookPath + LargeBookTitle + writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath) + ds3Testing.AssertNilError(t, err) + + var writeObjects []helperModels.PutObject + writeObjects = append(writeObjects, *writeObj) + + putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy) + ds3Testing.AssertNilError(t, err) + if putJobId == "" { + t.Error("expected to get a BP job ID, but instead got nothing") + } + + // Try to get some data from each blob + getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId)) + ds3Testing.AssertNilError(t, err) + + blobsChecked := 0 + for _, curObj := range getJob.MasterObjectList.Objects { + for _, blob := range curObj.Objects { + func() { + // create a temp file for writing the blob to + tempFile, err := ioutil.TempFile("", "go-sdk-test-") + ds3Testing.AssertNilError(t, err) + defer func() { + tempFile.Close() + os.Remove(tempFile.Name()) + }() + + // get a range of the blob + startRange := blob.Offset+10 // retrieve subset of blob + endRange := blob.Length+blob.Offset-1 + bytesWritten, err := helpers.RetryGettingBlobRange(client, testBucket, writeObj.PutObject.Name, blob.Offset, startRange, endRange, tempFile, client.Logger) + ds3Testing.AssertNilError(t, err) + ds3Testing.AssertInt64(t, "bytes written", endRange-startRange+1, bytesWritten) + + // verify that retrieved partial blob is correct + err = tempFile.Sync() + ds3Testing.AssertNilError(t, err) + + tempFile.Seek(0, 0) + length := endRange-startRange + testutils.VerifyPartialFile(t, bigFilePath, length, startRange, tempFile) + }() + blobsChecked++ + } + } + if blobsChecked == 0 { + t.Fatalf("didn't verify any blobs") + } +} + +func TestGetRemainingBlob(t *testing.T) { + defer testutils.DeleteBucketContents(client, testBucket) + + helper := helpers.NewHelpers(client) + strategy := newTestTransferStrategy(t) + + // Put a blobbed object to BP + const bigFilePath = LargeBookPath + LargeBookTitle + writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath) + ds3Testing.AssertNilError(t, err) + + var writeObjects []helperModels.PutObject + writeObjects = append(writeObjects, *writeObj) + + putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy) + ds3Testing.AssertNilError(t, err) + if putJobId == "" { + t.Error("expected to get a BP job ID, but instead got nothing") + } + + // Try to get some data from each blob + getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId)) + ds3Testing.AssertNilError(t, err) + + blobsChecked := 0 + for _, curObj := range getJob.MasterObjectList.Objects { + for _, blob := range curObj.Objects { + func() { + // create a temp file for writing the blob to + tempFile, err := ioutil.TempFile("", "go-sdk-test-") + ds3Testing.AssertNilError(t, err) + defer func() { + tempFile.Close() + os.Remove(tempFile.Name()) + }() + + // get the remainder of the blob after skipping some bytes + blob := helperModels.NewBlobDescription(*blob.Name, blob.Offset, blob.Length) + var amountToSkip int64 = 10 + err = helpers.GetRemainingBlob(client, testBucket, &blob, amountToSkip, tempFile, client.Logger) + ds3Testing.AssertNilError(t, err) + + // verify that retrieved partial blob is correct + err = tempFile.Sync() + ds3Testing.AssertNilError(t, err) + + tempFile.Seek(0, 0) + length := blob.Length() - amountToSkip + testutils.VerifyPartialFile(t, bigFilePath, length, blob.Offset()+amountToSkip, tempFile) + }() + blobsChecked++ + } + } + if blobsChecked == 0 { + t.Fatalf("didn't verify any blobs") + } +}