Skip to content

Commit

Permalink
Dev/robin/8656 blob etag support for forestrie (#31)
Browse files Browse the repository at this point in the history
* feat: etag support for put & read blob

The introduction of ETag support makes it possible to do two things:

1. read/modify/write concurrency guards, so that values are written back
   only if the destination content hasn't been modified since being
   read.
2. Efficient cache refreshing without fetching the blob data. still
   requires request round trip, but no data is returned in the response
   if the content is unchanged since the last read.

1. is going to give us crash fault tollerance and race reconciliaiton
   for updating forestrie blobs.

behaviour change:

Previously Read swallowed any error that wasn't accompanied by an io.EOF
condition. This causes the return states to be very confusing when
dealint with If- header responses. The err is now returned in that case.

The arrangements to ensure body.Read is called remain as before

AB#8656

* feat: propagate ms error codes in write responses

* fix: linter issues

* feat: modified since and unmodified since predicates

+ review fixes

---------

Co-authored-by: Robin Bryce <[email protected]>
  • Loading branch information
robinbryce and Robin Bryce authored Nov 6, 2023
1 parent 83b5d52 commit 232f4a3
Show file tree
Hide file tree
Showing 12 changed files with 559 additions and 76 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.task/
.local/
.vscode/
45 changes: 45 additions & 0 deletions azblob/accessconditions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package azblob

import (
"errors"

azStorageBlob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)

func storerOptionConditions(options *StorerOptions) (azStorageBlob.BlobAccessConditions, error) {

var blobAccessConditions azStorageBlob.BlobAccessConditions
if options.leaseID == "" && options.etagCondition == EtagNotUsed {
return blobAccessConditions, nil
}
if options.etag == "" && options.etagCondition != EtagNotUsed {
return blobAccessConditions, errors.New("etag value missing")
}

blobAccessConditions = azStorageBlob.BlobAccessConditions{}
if options.leaseID != "" {
blobAccessConditions.LeaseAccessConditions = &azStorageBlob.LeaseAccessConditions{
LeaseID: &options.leaseID,
}
}

blobAccessConditions.ModifiedAccessConditions = &azStorageBlob.ModifiedAccessConditions{}

switch options.etagCondition {
case ETagMatch:
blobAccessConditions.ModifiedAccessConditions.IfMatch = &options.etag
case ETagNoneMatch:
blobAccessConditions.ModifiedAccessConditions.IfNoneMatch = &options.etag
case TagsWhere:
blobAccessConditions.ModifiedAccessConditions.IfTags = &options.etag
default:
}
switch options.sinceCondition {
case IfConditionModifiedSince:
blobAccessConditions.ModifiedAccessConditions.IfModifiedSince = options.since
case IfConditionUnmodifiedSince:
blobAccessConditions.ModifiedAccessConditions.IfUnmodifiedSince = options.since
default:
}
return blobAccessConditions, nil
}
20 changes: 20 additions & 0 deletions azblob/bytesreadercloser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package azblob

import "bytes"

// BytesSeekableReader closer provides reader that has a No-Op Close and a
// usuable Seek. Because we need Seek, we can't use ioutil.NopCloser
type BytesSeekableReaderCloser struct {
*bytes.Reader
}

func NewBytesReaderCloser(b []byte) *BytesSeekableReaderCloser {
r := &BytesSeekableReaderCloser{
Reader: bytes.NewReader(b),
}
return r
}

func (io *BytesSeekableReaderCloser) Close() error {
return nil
}
68 changes: 29 additions & 39 deletions azblob/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import (
"fmt"
"io"
"net/http"
"net/textproto"
"strconv"

azStorageBlob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"

"github.com/rkvst/go-rkvstcommon/logger"
"github.com/rkvst/go-rkvstcommon/scannedstatus"
)

const (
Expand All @@ -39,6 +36,7 @@ func (azp *Storer) getTags(
logger.Sugar.Debugf("getTags BlockBlob Client %s error: %v", identity, err)
return nil, ErrorFromError(err)
}

resp, err := blobClient.GetTags(ctx, nil)
if err != nil {
logger.Sugar.Debugf("getTags BlockBlob URL %s error: %v", identity, err)
Expand Down Expand Up @@ -74,20 +72,6 @@ func (azp *Storer) getMetadata(
return resp.Metadata, nil
}

type ReaderResponse struct {
Reader io.ReadCloser
HashValue string
MimeType string
Size int64
Tags map[string]string
TimestampAccepted string
ScannedStatus string
ScannedBadReason string
ScannedTimestamp string

BlobClient *azStorageBlob.BlobClient
}

// Reader creates a reader.
func (azp *Storer) Reader(
ctx context.Context,
Expand All @@ -105,13 +89,9 @@ func (azp *Storer) Reader(
logger.Sugar.Debugf("Reader BlockBlob URL %s", identity)

resp := &ReaderResponse{}
var blobAccessConditions azStorageBlob.BlobAccessConditions
if options.leaseID != "" {
blobAccessConditions = azStorageBlob.BlobAccessConditions{
LeaseAccessConditions: &azStorageBlob.LeaseAccessConditions{
LeaseID: &options.leaseID,
},
}
blobAccessConditions, err := storerOptionConditions(options)
if err != nil {
return nil, err
}

if len(options.tags) > 0 || options.getTags {
Expand All @@ -127,6 +107,9 @@ func (azp *Storer) Reader(
resp.Tags = tags
}

// XXX: TODO this should be done with access conditions. this is racy as it
// stands. azure guarantees the tags for a blob read after write is
// consistent. we can't take advantage of that while this remains racy.
for k, requiredValue := range options.tags {
blobValue, ok := resp.Tags[k]
if !ok {
Expand All @@ -139,7 +122,9 @@ func (azp *Storer) Reader(
}
}

if options.getMetadata == OnlyMetadata || options.getMetadata == BothMetadataAndBlob {
// If we are *only* getting metadata, issue a distinct request. Otherwise we
// get it from the download response.
if options.getMetadata == OnlyMetadata {
metaData, metadataErr := azp.getMetadata(
ctx,
identity,
Expand All @@ -148,19 +133,9 @@ func (azp *Storer) Reader(
logger.Sugar.Infof("cannot get metadata: %v", metadataErr)
return nil, metadataErr
}
logger.Sugar.Debugf("blob metadata %v", metaData)
size, parseErr := strconv.ParseInt(metaData[textproto.CanonicalMIMEHeaderKey(SizeKey)], 10, 64)
if parseErr != nil {
logger.Sugar.Infof("cannot get size value: %v", parseErr)
return nil, parseErr
if parseErr := readerResponseMetadata(resp, metaData); parseErr != nil {
return nil, err
}
resp.Size = size
resp.HashValue = metaData[textproto.CanonicalMIMEHeaderKey(HashKey)]
resp.MimeType = metaData[textproto.CanonicalMIMEHeaderKey(MimeKey)]
resp.TimestampAccepted = metaData[textproto.CanonicalMIMEHeaderKey(TimeKey)]
resp.ScannedStatus = scannedstatus.FromString(metaData[textproto.CanonicalMIMEHeaderKey(scannedstatus.Key)]).String()
resp.ScannedBadReason = metaData[textproto.CanonicalMIMEHeaderKey(scannedstatus.BadReason)]
resp.ScannedTimestamp = metaData[textproto.CanonicalMIMEHeaderKey(scannedstatus.Timestamp)]
}

if options.getMetadata == OnlyMetadata {
Expand All @@ -180,12 +155,27 @@ func (azp *Storer) Reader(
Count: &countToEnd,
},
)

if err != nil && err == io.EOF { // nolint
logger.Sugar.Infof("cannot get blob body: %v", err)
return nil, ErrorFromError(err)
}
resp.Reader = get.Body(nil)
return resp, nil

normaliseReaderResponseErr(err, resp)
if err == nil {
// We *always* copy the metadata into the response
downloadReaderResponse(get, resp)

// for backwards compat, we only process the metadata on request
if options.getMetadata == BothMetadataAndBlob {
_ = readerResponseMetadata(resp, resp.Metadata) // the parse error is benign
}
}

if get.RawResponse != nil {
resp.Reader = get.Body(nil)
}
return resp, err
}

func (r *ReaderResponse) DownloadToWriter(w io.Writer) error {
Expand Down
17 changes: 17 additions & 0 deletions azblob/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,20 @@ func (e *Error) StatusCode() int {
logger.Sugar.Debugf("Return InternalServerError")
return http.StatusInternalServerError
}

// StorageErrorCode returns the underlying azure storage ErrorCode string eg "BlobNotFound"
func (e *Error) StorageErrorCode() string {
var terr *azStorageBlob.StorageError
if errors.As(e.err, &terr) {
if terr.ErrorCode != "" {
return string(terr.ErrorCode)
}
}
return ""
}

// IsConditionNotMet returns true if the err is the storage code indicating that
// a If- header predicate (eg ETag) was not met
func (e *Error) IsConditionNotMet() bool {
return e.StorageErrorCode() == string(azStorageBlob.StorageErrorCodeConditionNotMet)
}
154 changes: 154 additions & 0 deletions azblob/etags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package azblob

import (
"context"
"fmt"
"testing"

"github.com/google/uuid"
"github.com/rkvst/go-rkvstcommon/logger"
)

func uniqueTestName(testName string, t *testing.T) string {
uid, err := uuid.NewRandom()
if err != nil {
t.Fatalf("%v", err)
return testName
}
return fmt.Sprintf("%s-%s", testName, uid.String())
}

// Tests covering the setting and behaviour of etags. Requires the azurite emulator to be running

// Test_PutIfMatch checks the handling of WithETagMatch, which is typically used
// to re-concile racing updates by guaranteeing a single winner.
func TestPutIfMatch(t *testing.T) {

logger.New("NOOP")
defer logger.OnExit()

testName := uniqueTestName("PutIfMatch", t)

storer, err := NewDev(NewDevConfigFromEnv(), "devcontainer")
if err != nil {
t.Fatalf("failed to connect to blob store emulator: %v", err)
}
client := storer.GetServiceClient()
// This will error if it exists and that is fine
_, err = client.CreateContainer(context.Background(), "devcontainer", nil)
if err != nil {
s := err.Error()
logger.Sugar.Infof("benign err: %v, %s", err, s)
}

blobName := fmt.Sprintf("tests/blobs/%s-%d", testName, 1)

originalValue := []byte("ORIGINAL_VALUE")
secondValue := []byte("SECOND_VALUE")
thirdValue := []byte("THIRD_VALUE")

// establish the original value
wr, err := storer.Put(context.Background(), blobName, NewBytesReaderCloser(originalValue))
if err != nil {
t.Fatalf("failed put original value: %v", err)
}

// put the updated value only if we match the original value
wr2, err := storer.Put(
context.Background(), blobName, NewBytesReaderCloser(secondValue), WithEtagMatch(*wr.ETag))
if err != nil {
t.Fatalf("failed put second value: %v", err)
}

// read back only if it matches the new value
_, err = storer.Reader(context.Background(), blobName, WithEtagMatch(*wr2.ETag))
if err != nil {
t.Fatalf("failed to read value with updated ETag: %v", err)
}

// expect an error if we use the stale value
wr3, err := storer.Reader(context.Background(), blobName, WithEtagMatch(*wr.ETag))
if err == nil {
t.Fatalf("updated content despite stale etag: %s", wr3.XMsErrorCode)
}
// check the error is exactly as we expect
if !ErrorFromError(err).IsConditionNotMet() {
t.Fatalf("expected ConditionNotMet err, got: %v", err)
}

_, err = storer.Put(
context.Background(), blobName, NewBytesReaderCloser(thirdValue), WithEtagMatch(*wr.ETag))
if err == nil {
t.Fatalf("overwrote second value with wrong etag")
}
_, err = storer.Put(
context.Background(), blobName, NewBytesReaderCloser(thirdValue), WithEtagMatch(*wr2.ETag))
if err != nil {
t.Fatalf("failed put third value: %v", err)
}
}

// Test_ReadIfNoneMatch tests the handling of the WitEtagNoneMatch option
func Test_ReadIfNoneMatch(t *testing.T) {

logger.New("NOOP")
defer logger.OnExit()

testName := uniqueTestName("ReadIfNoneMatch", t)

storer, err := NewDev(NewDevConfigFromEnv(), "devcontainer")
if err != nil {
t.Fatalf("failed to connect to blob store emulator: %v", err)
}
client := storer.GetServiceClient()
// This will error if it exists and that is fine
_, _ = client.CreateContainer(context.Background(), "devcontainer", nil)

blobName := fmt.Sprintf("%s-%s", testName, "blob")

originalValue := []byte("ORIGINAL_VALUE")
secondValue := []byte("SECOND_VALUE")

wr, err := storer.Put(context.Background(), blobName, NewBytesReaderCloser(originalValue))
if err != nil {
t.Fatalf("failed put original value: %v", err)
}

// change the value
wr2, err := storer.Put(
context.Background(), blobName, NewBytesReaderCloser(secondValue))
if err != nil {
t.Fatalf("failed put second value: %v", err)
}
logger.Sugar.Infof("%v", wr2.ETag)

// check we *fail* to get it when the matching etag is used
wr3, err := storer.Reader(context.Background(), blobName, WithEtagNoneMatch(*wr2.ETag))
if err != nil {
// For reads we _dont_ get an err (as we do for Put's), instead we have to examine the response.
t.Fatalf("error reading with stale etag: %v", err)
}
if !wr3.ConditionNotMet() {
t.Fatalf("expected ConditionNotMet")
}

// check we do get it when the stale etag is used
wr4, err := storer.Reader(context.Background(), blobName, WithEtagNoneMatch(*wr.ETag))
if err != nil {
t.Fatalf("failed to read fresh value predicated on stale etag: %v", err)
}
// Note: unless using the If- headers
if !wr4.Ok() {
t.Fatalf("expected Ok")
}

_, err = storer.Put(context.Background(), blobName, NewBytesReaderCloser(secondValue))
if err != nil {
t.Fatalf("failed put second value: %v", err)
}

_, err = storer.Put(context.Background(), blobName, NewBytesReaderCloser(originalValue))
if err != nil {
t.Fatalf("failed put original value: %v", err)
}
}
Loading

0 comments on commit 232f4a3

Please sign in to comment.