Skip to content

Commit

Permalink
Merge pull request #109 from SpectraLogic/retry-get-on-unexpected-eof
Browse files Browse the repository at this point in the history
OTHER: adding retry around bulk get when a blob only partially transfers
  • Loading branch information
Sharon Shabtai authored Jul 16, 2020
2 parents ab5d074 + 7a4ef88 commit 4a0ad11
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 6 deletions.
4 changes: 2 additions & 2 deletions ds3_integration/utils/testUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func VerifyBookContent(t *testing.T, bookName string, actual io.ReadCloser) {
verifyContent(t, expected, actual)
}

func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.ReadCloser) {
func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64, actual io.Reader) {
f, err := os.Open(filePath)
ds3Testing.AssertNilError(t, err)

Expand All @@ -73,7 +73,7 @@ func VerifyPartialFile(t *testing.T, filePath string, length int64, offset int64
verifyPartialContent(t, *expected, actual, length)
}

func verifyPartialContent(t *testing.T, expected []byte, actual io.ReadCloser, length int64) {
func verifyPartialContent(t *testing.T, expected []byte, actual io.Reader, length int64) {
content, err := getNBytesFromReader(actual, length)
ds3Testing.AssertNilError(t, err)

Expand Down
59 changes: 55 additions & 4 deletions helpers/getProducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"time"
)

const timesToRetryGettingPartialBlob = 5

type getProducer struct {
JobMasterObjectList *ds3Models.MasterObjectList //MOL from put bulk job creation
GetObjects *[]helperModels.GetObject
Expand Down Expand Up @@ -146,10 +148,13 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf
return
}
if bytesWritten != info.blob.Length() {
err = fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length())
producer.strategy.Listeners.Errored(info.blob.Name(), err)
info.channelBuilder.SetFatalError(err)
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
producer.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", info.blob.Name(), info.blob.Offset(), bytesWritten, info.blob.Length())
err := GetRemainingBlob(producer.client, info.bucketName, info.blob, bytesWritten, writer, producer.Logger)
if err != nil {
producer.strategy.Listeners.Errored(info.blob.Name(), err)
info.channelBuilder.SetFatalError(err)
producer.Errorf("unable to copy content of object '%s' at offset '%d' from source to destination: %s", info.blob.Name(), info.blob.Offset(), err.Error())
}
}
return
}
Expand All @@ -166,6 +171,52 @@ func (producer *getProducer) transferOperationBuilder(info getObjectInfo) Transf
}
}

func GetRemainingBlob(client *ds3.Client, bucketName string, blob *helperModels.BlobDescription, amountAlreadyRetrieved int64, writer io.Writer, logger sdk_log.Logger) error {
logger.Debugf("starting retry for fetching partial blob '%s' at offset '%d': amount to retrieve %d", blob.Name(), blob.Offset(), blob.Length() - amountAlreadyRetrieved)
bytesRetrievedSoFar := amountAlreadyRetrieved
timesRetried := 0
rangeEnd := blob.Offset() + blob.Length() -1
for bytesRetrievedSoFar < blob.Length() && timesRetried < timesToRetryGettingPartialBlob {
rangeStart := blob.Offset() + bytesRetrievedSoFar
bytesRetrievedThisRound, err := RetryGettingBlobRange(client, bucketName, blob.Name(), blob.Offset(), rangeStart, rangeEnd, writer, logger)
if err != nil {
logger.Errorf("failed to get object '%s' at offset '%d', range %d=%d attempt %d: %s", blob.Name(), blob.Offset(), rangeStart, rangeEnd, timesRetried, err.Error())
}
bytesRetrievedSoFar+= bytesRetrievedThisRound
timesRetried++
}

if bytesRetrievedSoFar < blob.Length() {
return fmt.Errorf("failed to copy all content of object '%s' at offset '%d': only wrote %d of %d bytes", blob.Name(), blob.Offset(), bytesRetrievedSoFar, blob.Length())
}
return nil
}

func RetryGettingBlobRange(client *ds3.Client, bucketName string, objectName string, blobOffset int64, rangeStart int64, rangeEnd int64, writer io.Writer, logger sdk_log.Logger) (int64, error) {
// perform a naked get call for the rest of the blob that we originally failed to get
partOfBlobToFetch := ds3Models.Range{
Start: rangeStart,
End: rangeEnd,
}
getObjRequest := ds3Models.NewGetObjectRequest(bucketName, objectName).
WithOffset(blobOffset).
WithRanges(partOfBlobToFetch)

getObjResponse, err := client.GetObject(getObjRequest)
if err != nil {
return 0, err
}
defer func() {
err := getObjResponse.Content.Close()
if err != nil {
logger.Warningf("failed to close response body for get object '%s' with range %d-%d: %v", objectName, rangeStart, rangeEnd, err)
}
}()

bytesWritten, err := io.Copy(writer, getObjResponse.Content) //copy all content from response reader to destination writer
return bytesWritten, err
}

// Writes a range of a blob to its destination channel
func writeRangeToDestination(channelBuilder helperModels.WriteChannelBuilder, blobRange ds3Models.Range, content io.Reader) error {
writer, err := channelBuilder.GetChannel(blobRange.Start)
Expand Down
117 changes: 117 additions & 0 deletions helpers_integration/helpersImpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,3 +504,120 @@ func TestBulkPutAndGetLotsOfFiles(t *testing.T) {
t.Errorf("expected to get a BP job ID, but instead got nothing")
}
}

func TestRetryGettingBlobRange(t *testing.T) {
defer testutils.DeleteBucketContents(client, testBucket)

helper := helpers.NewHelpers(client)
strategy := newTestTransferStrategy(t)

// Put a blobbed object to BP
const bigFilePath = LargeBookPath + LargeBookTitle
writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath)
ds3Testing.AssertNilError(t, err)

var writeObjects []helperModels.PutObject
writeObjects = append(writeObjects, *writeObj)

putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy)
ds3Testing.AssertNilError(t, err)
if putJobId == "" {
t.Error("expected to get a BP job ID, but instead got nothing")
}

// Try to get some data from each blob
getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId))
ds3Testing.AssertNilError(t, err)

blobsChecked := 0
for _, curObj := range getJob.MasterObjectList.Objects {
for _, blob := range curObj.Objects {
func() {
// create a temp file for writing the blob to
tempFile, err := ioutil.TempFile("", "go-sdk-test-")
ds3Testing.AssertNilError(t, err)
defer func() {
tempFile.Close()
os.Remove(tempFile.Name())
}()

// get a range of the blob
startRange := blob.Offset+10 // retrieve subset of blob
endRange := blob.Length+blob.Offset-1
bytesWritten, err := helpers.RetryGettingBlobRange(client, testBucket, writeObj.PutObject.Name, blob.Offset, startRange, endRange, tempFile, client.Logger)
ds3Testing.AssertNilError(t, err)
ds3Testing.AssertInt64(t, "bytes written", endRange-startRange+1, bytesWritten)

// verify that retrieved partial blob is correct
err = tempFile.Sync()
ds3Testing.AssertNilError(t, err)

tempFile.Seek(0, 0)
length := endRange-startRange
testutils.VerifyPartialFile(t, bigFilePath, length, startRange, tempFile)
}()
blobsChecked++
}
}
if blobsChecked == 0 {
t.Fatalf("didn't verify any blobs")
}
}

func TestGetRemainingBlob(t *testing.T) {
defer testutils.DeleteBucketContents(client, testBucket)

helper := helpers.NewHelpers(client)
strategy := newTestTransferStrategy(t)

// Put a blobbed object to BP
const bigFilePath = LargeBookPath + LargeBookTitle
writeObj, err := getTestWriteObjectRandomAccess(LargeBookTitle, bigFilePath)
ds3Testing.AssertNilError(t, err)

var writeObjects []helperModels.PutObject
writeObjects = append(writeObjects, *writeObj)

putJobId, err := helper.PutObjects(testBucket, writeObjects, strategy)
ds3Testing.AssertNilError(t, err)
if putJobId == "" {
t.Error("expected to get a BP job ID, but instead got nothing")
}

// Try to get some data from each blob
getJob, err := client.GetJobSpectraS3(ds3Models.NewGetJobSpectraS3Request(putJobId))
ds3Testing.AssertNilError(t, err)

blobsChecked := 0
for _, curObj := range getJob.MasterObjectList.Objects {
for _, blob := range curObj.Objects {
func() {
// create a temp file for writing the blob to
tempFile, err := ioutil.TempFile("", "go-sdk-test-")
ds3Testing.AssertNilError(t, err)
defer func() {
tempFile.Close()
os.Remove(tempFile.Name())
}()

// get the remainder of the blob after skipping some bytes
blob := helperModels.NewBlobDescription(*blob.Name, blob.Offset, blob.Length)
var amountToSkip int64 = 10
err = helpers.GetRemainingBlob(client, testBucket, &blob, amountToSkip, tempFile, client.Logger)
ds3Testing.AssertNilError(t, err)

// verify that retrieved partial blob is correct
err = tempFile.Sync()
ds3Testing.AssertNilError(t, err)

tempFile.Seek(0, 0)
length := blob.Length() - amountToSkip
testutils.VerifyPartialFile(t, bigFilePath, length, blob.Offset()+amountToSkip, tempFile)
}()
blobsChecked++
}
}
if blobsChecked == 0 {
t.Fatalf("didn't verify any blobs")
}
}

0 comments on commit 4a0ad11

Please sign in to comment.