Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/robin/8656 blob etag support for forestrie #31

Merged
merged 4 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.task/
.local/
.vscode/
45 changes: 45 additions & 0 deletions azblob/accessconditions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package azblob

import (
"errors"

azStorageBlob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
)

func storerOptionConditions(options *StorerOptions) (azStorageBlob.BlobAccessConditions, error) {

var blobAccessConditions azStorageBlob.BlobAccessConditions
if options.leaseID == "" && options.etagCondition == EtagNotUsed {
return blobAccessConditions, nil
}
if options.etag == "" && options.etagCondition != EtagNotUsed {
return blobAccessConditions, errors.New("etag value missing")
}

blobAccessConditions = azStorageBlob.BlobAccessConditions{}
if options.leaseID != "" {
blobAccessConditions.LeaseAccessConditions = &azStorageBlob.LeaseAccessConditions{
LeaseID: &options.leaseID,
}
}

blobAccessConditions.ModifiedAccessConditions = &azStorageBlob.ModifiedAccessConditions{}

switch options.etagCondition {
case ETagMatch:
blobAccessConditions.ModifiedAccessConditions.IfMatch = &options.etag
case ETagNoneMatch:
blobAccessConditions.ModifiedAccessConditions.IfNoneMatch = &options.etag
case TagsWhere:
blobAccessConditions.ModifiedAccessConditions.IfTags = &options.etag
default:
}
switch options.sinceCondition {
case IfConditionModifiedSince:
blobAccessConditions.ModifiedAccessConditions.IfModifiedSince = options.since
case IfConditionUnmodifiedSince:
blobAccessConditions.ModifiedAccessConditions.IfUnmodifiedSince = options.since
default:
}
return blobAccessConditions, nil
}
20 changes: 20 additions & 0 deletions azblob/bytesreadercloser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package azblob

import "bytes"

// BytesSeekableReader closer provides reader that has a No-Op Close and a
// usuable Seek. Because we need Seek, we can't use ioutil.NopCloser
type BytesSeekableReaderCloser struct {
*bytes.Reader
}

func NewBytesReaderCloser(b []byte) *BytesSeekableReaderCloser {
r := &BytesSeekableReaderCloser{
Reader: bytes.NewReader(b),
}
return r
}

func (io *BytesSeekableReaderCloser) Close() error {
return nil
}
68 changes: 29 additions & 39 deletions azblob/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,10 @@ import (
"fmt"
"io"
"net/http"
"net/textproto"
"strconv"

azStorageBlob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"

"github.com/rkvst/go-rkvstcommon/logger"
"github.com/rkvst/go-rkvstcommon/scannedstatus"
)

const (
Expand All @@ -39,6 +36,7 @@ func (azp *Storer) getTags(
logger.Sugar.Debugf("getTags BlockBlob Client %s error: %v", identity, err)
return nil, ErrorFromError(err)
}

resp, err := blobClient.GetTags(ctx, nil)
if err != nil {
logger.Sugar.Debugf("getTags BlockBlob URL %s error: %v", identity, err)
Expand Down Expand Up @@ -74,20 +72,6 @@ func (azp *Storer) getMetadata(
return resp.Metadata, nil
}

type ReaderResponse struct {
Reader io.ReadCloser
HashValue string
MimeType string
Size int64
Tags map[string]string
TimestampAccepted string
ScannedStatus string
ScannedBadReason string
ScannedTimestamp string

BlobClient *azStorageBlob.BlobClient
}

// Reader creates a reader.
func (azp *Storer) Reader(
ctx context.Context,
Expand All @@ -105,13 +89,9 @@ func (azp *Storer) Reader(
logger.Sugar.Debugf("Reader BlockBlob URL %s", identity)

resp := &ReaderResponse{}
var blobAccessConditions azStorageBlob.BlobAccessConditions
if options.leaseID != "" {
blobAccessConditions = azStorageBlob.BlobAccessConditions{
LeaseAccessConditions: &azStorageBlob.LeaseAccessConditions{
LeaseID: &options.leaseID,
},
}
blobAccessConditions, err := storerOptionConditions(options)
if err != nil {
return nil, err
}

if len(options.tags) > 0 || options.getTags {
Expand All @@ -127,6 +107,9 @@ func (azp *Storer) Reader(
resp.Tags = tags
}

// XXX: TODO this should be done with access conditions. this is racy as it
// stands. azure guarantees the tags for a blob read after write is
// consistent. we can't take advantage of that while this remains racy.
for k, requiredValue := range options.tags {
blobValue, ok := resp.Tags[k]
if !ok {
Expand All @@ -139,7 +122,9 @@ func (azp *Storer) Reader(
}
}

if options.getMetadata == OnlyMetadata || options.getMetadata == BothMetadataAndBlob {
// If we are *only* getting metadata, issue a distinct request. Otherwise we
// get it from the download response.
if options.getMetadata == OnlyMetadata {
metaData, metadataErr := azp.getMetadata(
ctx,
identity,
Expand All @@ -148,19 +133,9 @@ func (azp *Storer) Reader(
logger.Sugar.Infof("cannot get metadata: %v", metadataErr)
return nil, metadataErr
}
logger.Sugar.Debugf("blob metadata %v", metaData)
size, parseErr := strconv.ParseInt(metaData[textproto.CanonicalMIMEHeaderKey(SizeKey)], 10, 64)
if parseErr != nil {
logger.Sugar.Infof("cannot get size value: %v", parseErr)
return nil, parseErr
if parseErr := readerResponseMetadata(resp, metaData); parseErr != nil {
return nil, err
}
resp.Size = size
resp.HashValue = metaData[textproto.CanonicalMIMEHeaderKey(HashKey)]
resp.MimeType = metaData[textproto.CanonicalMIMEHeaderKey(MimeKey)]
resp.TimestampAccepted = metaData[textproto.CanonicalMIMEHeaderKey(TimeKey)]
resp.ScannedStatus = scannedstatus.FromString(metaData[textproto.CanonicalMIMEHeaderKey(scannedstatus.Key)]).String()
resp.ScannedBadReason = metaData[textproto.CanonicalMIMEHeaderKey(scannedstatus.BadReason)]
resp.ScannedTimestamp = metaData[textproto.CanonicalMIMEHeaderKey(scannedstatus.Timestamp)]
}

if options.getMetadata == OnlyMetadata {
Expand All @@ -180,12 +155,27 @@ func (azp *Storer) Reader(
Count: &countToEnd,
},
)

if err != nil && err == io.EOF { // nolint
logger.Sugar.Infof("cannot get blob body: %v", err)
return nil, ErrorFromError(err)
}
resp.Reader = get.Body(nil)
return resp, nil

normaliseReaderResponseErr(err, resp)
if err == nil {
// We *always* copy the metadata into the response
downloadReaderResponse(get, resp)

// for backwards compat, we only process the metadata on request
if options.getMetadata == BothMetadataAndBlob {
_ = readerResponseMetadata(resp, resp.Metadata) // the parse error is benign
}
}

if get.RawResponse != nil {
resp.Reader = get.Body(nil)
}
return resp, err
}

func (r *ReaderResponse) DownloadToWriter(w io.Writer) error {
Expand Down
17 changes: 17 additions & 0 deletions azblob/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,20 @@ func (e *Error) StatusCode() int {
logger.Sugar.Debugf("Return InternalServerError")
return http.StatusInternalServerError
}

// StorageErrorCode returns the underlying azure storage ErrorCode string eg "BlobNotFound"
func (e *Error) StorageErrorCode() string {
var terr *azStorageBlob.StorageError
if errors.As(e.err, &terr) {
if terr.ErrorCode != "" {
return string(terr.ErrorCode)
}
}
return ""
}

// IsConditionNotMet returns true if the err is the storage code indicating that
// a If- header predicate (eg ETag) was not met
func (e *Error) IsConditionNotMet() bool {
return e.StorageErrorCode() == string(azStorageBlob.StorageErrorCodeConditionNotMet)
}
154 changes: 154 additions & 0 deletions azblob/etags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package azblob

import (
"context"
"fmt"
"testing"

"github.com/google/uuid"
"github.com/rkvst/go-rkvstcommon/logger"
)

func uniqueTestName(testName string, t *testing.T) string {
uid, err := uuid.NewRandom()
if err != nil {
t.Fatalf("%v", err)
return testName
}
return fmt.Sprintf("%s-%s", testName, uid.String())
}

// Tests covering the setting and behaviour of etags. Requires the azurite emulator to be running

// Test_PutIfMatch checks the handling of WithETagMatch, which is typically used
// to re-concile racing updates by guaranteeing a single winner.
func TestPutIfMatch(t *testing.T) {

logger.New("NOOP")
defer logger.OnExit()

testName := uniqueTestName("PutIfMatch", t)

storer, err := NewDev(NewDevConfigFromEnv(), "devcontainer")
if err != nil {
t.Fatalf("failed to connect to blob store emulator: %v", err)
}
client := storer.GetServiceClient()
// This will error if it exists and that is fine
_, err = client.CreateContainer(context.Background(), "devcontainer", nil)
if err != nil {
s := err.Error()
logger.Sugar.Infof("benign err: %v, %s", err, s)
}

blobName := fmt.Sprintf("tests/blobs/%s-%d", testName, 1)

originalValue := []byte("ORIGINAL_VALUE")
secondValue := []byte("SECOND_VALUE")
thirdValue := []byte("THIRD_VALUE")

// establish the original value
wr, err := storer.Put(context.Background(), blobName, NewBytesReaderCloser(originalValue))
if err != nil {
t.Fatalf("failed put original value: %v", err)
}

// put the updated value only if we match the original value
wr2, err := storer.Put(
context.Background(), blobName, NewBytesReaderCloser(secondValue), WithEtagMatch(*wr.ETag))
if err != nil {
t.Fatalf("failed put second value: %v", err)
}

// read back only if it matches the new value
_, err = storer.Reader(context.Background(), blobName, WithEtagMatch(*wr2.ETag))
if err != nil {
t.Fatalf("failed to read value with updated ETag: %v", err)
}

// expect an error if we use the stale value
wr3, err := storer.Reader(context.Background(), blobName, WithEtagMatch(*wr.ETag))
if err == nil {
t.Fatalf("updated content despite stale etag: %s", wr3.XMsErrorCode)
}
// check the error is exactly as we expect
if !ErrorFromError(err).IsConditionNotMet() {
t.Fatalf("expected ConditionNotMet err, got: %v", err)
}

_, err = storer.Put(
context.Background(), blobName, NewBytesReaderCloser(thirdValue), WithEtagMatch(*wr.ETag))
if err == nil {
t.Fatalf("overwrote second value with wrong etag")
}
_, err = storer.Put(
context.Background(), blobName, NewBytesReaderCloser(thirdValue), WithEtagMatch(*wr2.ETag))
if err != nil {
t.Fatalf("failed put third value: %v", err)
}
}

// Test_ReadIfNoneMatch tests the handling of the WitEtagNoneMatch option
func Test_ReadIfNoneMatch(t *testing.T) {

logger.New("NOOP")
defer logger.OnExit()

testName := uniqueTestName("ReadIfNoneMatch", t)

storer, err := NewDev(NewDevConfigFromEnv(), "devcontainer")
if err != nil {
t.Fatalf("failed to connect to blob store emulator: %v", err)
}
client := storer.GetServiceClient()
// This will error if it exists and that is fine
_, _ = client.CreateContainer(context.Background(), "devcontainer", nil)

blobName := fmt.Sprintf("%s-%s", testName, "blob")

originalValue := []byte("ORIGINAL_VALUE")
secondValue := []byte("SECOND_VALUE")

wr, err := storer.Put(context.Background(), blobName, NewBytesReaderCloser(originalValue))
if err != nil {
t.Fatalf("failed put original value: %v", err)
}

// change the value
wr2, err := storer.Put(
context.Background(), blobName, NewBytesReaderCloser(secondValue))
if err != nil {
t.Fatalf("failed put second value: %v", err)
}
logger.Sugar.Infof("%v", wr2.ETag)

// check we *fail* to get it when the matching etag is used
wr3, err := storer.Reader(context.Background(), blobName, WithEtagNoneMatch(*wr2.ETag))
if err != nil {
// For reads we _dont_ get an err (as we do for Put's), instead we have to examine the response.
t.Fatalf("error reading with stale etag: %v", err)
}
if !wr3.ConditionNotMet() {
t.Fatalf("expected ConditionNotMet")
}

// check we do get it when the stale etag is used
wr4, err := storer.Reader(context.Background(), blobName, WithEtagNoneMatch(*wr.ETag))
if err != nil {
t.Fatalf("failed to read fresh value predicated on stale etag: %v", err)
}
// Note: unless using the If- headers
if !wr4.Ok() {
t.Fatalf("expected Ok")
}

_, err = storer.Put(context.Background(), blobName, NewBytesReaderCloser(secondValue))
if err != nil {
t.Fatalf("failed put second value: %v", err)
}

_, err = storer.Put(context.Background(), blobName, NewBytesReaderCloser(originalValue))
if err != nil {
t.Fatalf("failed put original value: %v", err)
}
}
Loading
Loading