Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Put blob support, azureite local testing support #28

Merged
merged 1 commit into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.task/
.local/
3 changes: 3 additions & 0 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ includes:
taskfile: ./taskfiles/Taskfile_codeqa.yml
mocks:
taskfile: ./taskfiles/Taskfile_mocks.yml
azurite:
taskfile: ./taskfiles/Taskfile_azurite.yml
dir: ./taskfiles

tasks:

Expand Down
86 changes: 86 additions & 0 deletions azblob/put.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package azblob

import (
"context"
"fmt"
"io"

azStorageBlob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/rkvst/go-rkvstcommon/logger"
)

// Put creates or replaces a blob
// metadata and tags are set in the same operation as the content update.
func (azp *Storer) Put(
ctx context.Context,
identity string,
source io.ReadSeekCloser,
opts ...Option,
) (*WriteResponse, error) {
err := azp.checkContainer(ctx)
if err != nil {
return nil, err
}
logger.Sugar.Debugf("Create or replace BlockBlob %s", identity)

options := &StorerOptions{}
for _, opt := range opts {
opt(options)
}

_, err = azp.putBlob(
ctx, identity, source, options.leaseID, options.tags, options.metadata)
if err != nil {
return nil, err
}
return &WriteResponse{}, nil
}

// putBlob creates or replaces a blob. If the blob exists, any existing metdata
// is replaced in its entirity. It is an error if the seek position of the
// reader can't be set to zero
// ref: https://learn.microsoft.com/en-gb/rest/api/storageservices/put-blob?tabs=azure-ad
func (azp *Storer) putBlob(
ctx context.Context,
identity string,
body io.ReadSeekCloser,
leaseID string,
tags map[string]string,
metadata map[string]string,
) (*WriteResponse, error) {
logger.Sugar.Debugf("write %s", identity)

// The az sdk panics if this is not the case, we want an err
if pos, err := body.Seek(0, io.SeekCurrent); pos != 0 || err != nil {
return nil, fmt.Errorf("bad body for %s: %v", identity, ErrMustSupportSeek0)
}

blockBlobClient, err := azp.containerClient.NewBlockBlobClient(identity)
if err != nil {
logger.Sugar.Infof("Cannot get block blob client blob: %v", err)
return nil, ErrorFromError(err)
}
blobAccessConditions := azStorageBlob.BlobAccessConditions{
LeaseAccessConditions: &azStorageBlob.LeaseAccessConditions{},
ModifiedAccessConditions: &azStorageBlob.ModifiedAccessConditions{},
}
if leaseID != "" {
blobAccessConditions.LeaseAccessConditions.LeaseID = &leaseID
}

_, err = blockBlobClient.Upload(
ctx,
body,
&azStorageBlob.BlockBlobUploadOptions{
BlobAccessConditions: &blobAccessConditions,
Metadata: metadata,
TagsMap: tags,
},
)
if err != nil {
logger.Sugar.Infof("Cannot upload blob: %v", err)
return nil, ErrorFromError(err)

}
return &WriteResponse{}, nil
}
116 changes: 116 additions & 0 deletions azblob/storerazurite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package azblob

import (
"errors"
"fmt"
"os"

azStorageBlob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/rkvst/go-rkvstcommon/logger"
)

type DevConfig struct {
AccountName string
Key string
URL string
}

const (
// These constants are well known and described here:
// See: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite

azureStorageAccountVar string = "AZURE_STORAGE_ACCOUNT"
azureStorageKeyVar string = "AZURE_STORAGE_KEY"
azuriteBlobEndpointURLVar string = "AZURITE_BLOB_ENDPOINT_URL"

azuriteWellKnownKey string = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
azuriteWellKnownAccount string = "devstoreaccount1"
azuriteWellKnownBlobEndpointURL string = "http://127.0.0.1:10000/"
azuriteResourceGroup string = "azurite-emulator"
azuriteSubscription string = "azurite-emulator"
)

// NewDevConfigFromEnv reads azurite (azure emulator) config from the standard
// azure env vars and falls back to the docmented defaults if they are not set.
// If overriding any settings via env, be sure to also configure
// AZURITE_ACCOUNTS for the emulator
// See: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite
func NewDevConfigFromEnv() DevConfig {

// This is not for production, it is specifically for testing, hence the use
// of programed in defaults.
return DevConfig{
AccountName: devVarWithDefault(azureStorageAccountVar, azuriteWellKnownAccount),
Key: devVarWithDefault(azureStorageKeyVar, azuriteWellKnownKey),
URL: devVarWithDefault(azuriteBlobEndpointURLVar, azuriteWellKnownKey),
}
}

// GetContainerClient returns the underlying container client
func (s *Storer) GetContainerClient() *azStorageBlob.ContainerClient {
return s.containerClient
}

// GetServiceClient returns the underlying service client
func (s *Storer) GetServiceClient() *azStorageBlob.ServiceClient {
return s.serviceClient
}

// NewDev returns a normal blob client but connected for the azurite local
// emulator It uses the well known account name and key by default. If
// overriding, be sure to also configure AZURITE_ACCOUNTS for the emulator
// See: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite
func NewDev(cfg DevConfig, container string) (*Storer, error) {
logger.Sugar.Infof(
"Attempt environment auth with accountName: %s, for container: %s",
cfg.AccountName, container,
)

if cfg.AccountName == "" || cfg.Key == "" || cfg.URL == "" {
return nil, errors.New("missing connection configuration variables")
}
cred, err := azStorageBlob.NewSharedKeyCredential(cfg.AccountName, cfg.Key)
if err != nil {
return nil, err
}

azp := &Storer{
AccountName: cfg.AccountName,
ResourceGroup: azuriteResourceGroup, // just for logging
Subscription: azuriteSubscription, // just for logging
Container: container,
credential: cred,
rootURL: cfg.URL,
}

azp.containerURL = fmt.Sprintf(
"%s%s",
cfg.URL,
container,
)
azp.serviceClient, err = azStorageBlob.NewServiceClientWithSharedKey(
cfg.URL,
cred,
nil,
)
if err != nil {
logger.Sugar.Infof("unable to create serviceclient %s: %v", azp.containerURL, err)
return nil, err
}
azp.containerClient, err = azp.serviceClient.NewContainerClient(container)
if err != nil {
logger.Sugar.Infof("unable to create containerclient %s: %v", container, err)
return nil, err
}

return azp, nil
}

// devVarWithDefault reads the key from env.
// If key is not set, returns the defaultValue.
func devVarWithDefault(key string, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
}
return defaultValue
}
22 changes: 12 additions & 10 deletions azblob/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -20,6 +21,10 @@ import (
"github.com/rkvst/go-rkvstcommon/logger"
)

var (
ErrMustSupportSeek0 = errors.New("must be seekable to position 0")
)

const (
chunkSize = 2 * 1024 * 1024
)
Expand Down Expand Up @@ -101,7 +106,7 @@ func (azp *Storer) Write(
opt(options)
}

_, err = azp.write(ctx, identity, source, options.leaseID)
_, err = azp.writeStream(ctx, identity, source, options.leaseID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -143,13 +148,11 @@ func (azp *Storer) WriteStream(
return azp.streamReader(ctx, identity, source, options)
}

func (azp *Storer) write(
func (azp *Storer) writeStream(
ctx context.Context,
identity string,
reader io.Reader,
leaseID string,
//tags map[string]string,
//metadata map[string]string,
) (*WriteResponse, error) {
logger.Sugar.Debugf("write %s", identity)
blockBlobClient, err := azp.containerClient.NewBlockBlobClient(identity)
Expand All @@ -164,18 +167,17 @@ func (azp *Storer) write(
if leaseID != "" {
blobAccessConditions.LeaseAccessConditions.LeaseID = &leaseID
}
// Writing tags and metadata should be possible using the fields in the options.
// Could not get it to work so am leaving commented out and will do explicit
// setTags and setMetadata calls instead in wrapping calls such as Write()

// Sream uploading does not support setting tags because the pages are
// uploaded in parallel and the tags can only be set once those pages block
// ids are commited. Use putBlob if you want this behaviour.
_, err = blockBlobClient.UploadStream(
ctx,
reader,
azStorageBlob.UploadStreamOptions{
BufferSize: chunkSize,
MaxBuffers: 3,
BlobAccessConditions: &blobAccessConditions,
//Metadata: metadata,
//BlobTagsMap: tags,
},
)
if err != nil {
Expand Down Expand Up @@ -291,7 +293,7 @@ func (azp *Storer) streamReader(
logger.Sugar.Debugf("Mime type is: %s", mimeType)

// prepare blob
resp, err = azp.write(ctx, identity, uploadData, options.leaseID)
resp, err = azp.writeStream(ctx, identity, uploadData, options.leaseID)
if err != nil {
return nil, err
}
Expand Down
75 changes: 75 additions & 0 deletions taskfiles/Taskfile_azurite.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
version: '3'

# Taskfile for working with dockerized azurite
#
# See: https://learn.microsoft.com/en-us/azure/storage/blobs/use-azurite-to-run-automated-tests
# Azurite supports local development and integration testing for services which
# use the message bus, blob store and ohter azure storage primitives.

vars:
# AZURITE_DATA_DIR the --location option for azurite is where data is persisted
AZURITE_DATA_DIR: '{{.AZURITE_DATA_DIR | default "../.local/azurite-data"}}'
AZURITE_BLOB_PORT: '{{.AZURITE_BLOB_PORT | default "10000"}}'
AZURITE_QUEUE_PORT: '{{.AZURITE_QUEUE_PORT | default "10001"}}'
AZURITE_TABLE_PORT: '{{.AZURITE_TABLE_PORT | default "11111"}}'
AZURITE_CONTAINER_NAME: '{{.AZURITE_CONTAINER_NAME | default "go-rkvstcommon-azurite"}}'
AZURITE_IMAGE: '{{.AZURITE_IMAGE | default "mcr.microsoft.com/azure-storage/azurite"}}'

tasks:
start:
desc: start azurite azure local storage emulator in a named docker container
summary: |
Starts the azure local storage emulator service in a docker container

The following env vars are respected for configuration

AZURITE_CONTAINER_NAME:
The container name to use, default "forestrie-azurite"
AZURITE_DATA_DIR:
Where the data is persisted, default ".local/azurite-data"
AZURITE_BLOB_PORT:
Blob service listening port, default "10000"
AZURITE_QUEUE_PORT:
Queue port, default "10001"
AZURITE_TABLE_PORT:
Table port, default "11111"
cmds:
- |
AZURITE_DATA_DIR=$(mkdir -p {{.AZURITE_DATA_DIR}} && cd {{.AZURITE_DATA_DIR}} && pwd)
echo "AZURITE_DATA_DIR: ${AZURITE_DATA_DIR}"
docker run \
--name {{.AZURITE_CONTAINER_NAME}} \
-p {{.AZURITE_BLOB_PORT}}:10000 \
-p {{.AZURITE_QUEUE_PORT}}:10001 \
-p {{.AZURITE_TABLE_PORT}}:11111 \
-dt -u $(id -u):$(id -g) \
--mount type=bind,src=${AZURITE_DATA_DIR},dst=/data \
{{.AZURITE_IMAGE}} \
{{.CLI_ARGS}}

stop:
desc: stop azurite azure local storage emulator docker container
summary: |
Stops the azure local storage emulator service
cmds:
- docker rm -f {{.AZURITE_CONTAINER_NAME}}

cleanup:
desc: |
stop the container and DELETE the data directory identified by AZURITE_DATA_DIR
summary: |
Stops the azure local storage emulator service
deps: [stop]
cmds:
- |
#
[[ -z "{{.AZURITE_DATA_DIR}}" ]] && exit 0

echo "deleting data at {{.AZURITE_DATA_DIR}}"
rm -vrf {{.AZURITE_DATA_DIR}}

logs:
desc: follow the logs of the azurite container
cmds:
- docker logs -f {{.AZURITE_CONTAINER_NAME}}
Loading