Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sprint 1.10 #1228

Merged
merged 8 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-&-publish-docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:

jobs:
blobber:
timeout-minutes: 25
runs-on: [self-hosted, arc-runner]
steps:
- name: Set docker image tag
Expand Down Expand Up @@ -95,6 +96,7 @@ jobs:
./docker.local/bin/build.blobber.sh

validator:
timeout-minutes: 20
runs-on: [self-hosted, arc-runner]
steps:
- name: Set docker image tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocatio
return err
}
}
logging.Logger.Info("ApplyChanges", zap.Any("rootRef", rootRef))
_, err = rootRef.CalculateHash(ctx, true)
return err
}
Expand Down
42 changes: 39 additions & 3 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
)

var (
Expand All @@ -15,18 +17,23 @@ var (

var (
connectionObjSizeMap = make(map[string]*ConnectionObjSize)
connectionObjMutex sync.Mutex
connectionObjMutex sync.RWMutex
)

type ConnectionObjSize struct {
Size int64
UpdatedAt time.Time
Changes map[string]*ConnectionChanges
}

type ConnectionChanges struct {
Hasher *filestore.CommitHasher
}

// GetConnectionObjSize gets the connection size from the memory
func GetConnectionObjSize(connectionID string) int64 {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObjSize := connectionObjSizeMap[connectionID]
if connectionObjSize == nil {
return 0
Expand All @@ -43,6 +50,7 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
Size: addSize,
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
return
}
Expand All @@ -51,6 +59,34 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSize.UpdatedAt = time.Now()
}

func GetHasher(connectionID, pathHash string) *filestore.CommitHasher {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
return nil
}
if connectionObj.Changes[pathHash] == nil {
return nil
}
return connectionObj.Changes[pathHash].Hasher
}

func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filestore.CommitHasher) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
}
connectionObjSizeMap[connectionID].Changes[pathHash] = &ConnectionChanges{
Hasher: hasher,
}
}

// DeleteConnectionObjEntry remove the connectionID entry from map
// If the given connectionID is not present, then it is no-op.
func DeleteConnectionObjEntry(connectionID string) {
Expand Down
15 changes: 8 additions & 7 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ type Allocation struct {
RepairerID string `gorm:"column:repairer_id;size:64;not null"`
Expiration common.Timestamp `gorm:"column:expiration_date;not null"`
// AllocationRoot allcation_root of last write_marker
AllocationRoot string `gorm:"column:allocation_root;size:64;not null;default:''"`
FileMetaRoot string `gorm:"column:file_meta_root;size:64;not null;default:''"`
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
AllocationRoot string `gorm:"column:allocation_root;size:64;not null;default:''"`
FileMetaRoot string `gorm:"column:file_meta_root;size:64;not null;default:''"`
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
StartTime common.Timestamp `gorm:"column:start_time;not null"`
// Ending and cleaning
CleanedUp bool `gorm:"column:cleaned_up;not null;default:false"`
Finalized bool `gorm:"column:finalized;not null;default:false"`
Expand Down
10 changes: 8 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/file_changer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)

// BaseFileChanger base file change processor
Expand Down Expand Up @@ -44,8 +45,9 @@ type BaseFileChanger struct {
ThumbnailSize int64 `json:"thumbnail_size"`
ThumbnailFilename string `json:"thumbnail_filename"`

EncryptedKey string `json:"encrypted_key,omitempty"`
CustomMeta string `json:"custom_meta,omitempty"`
EncryptedKey string `json:"encrypted_key,omitempty"`
EncryptedKeyPoint string `json:"encrypted_key_point,omitempty"`
CustomMeta string `json:"custom_meta,omitempty"`

ChunkSize int64 `json:"chunk_size,omitempty"` // the size of achunk. 64*1024 is default
IsFinal bool `json:"is_final,omitempty"` // current chunk is last or not
Expand Down Expand Up @@ -92,6 +94,10 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
fileInputData.Hasher = GetHasher(fc.ConnectionID, encryption.Hash(fc.Path))
if fileInputData.Hasher == nil {
return common.NewError("invalid_parameters", "Invalid parameters. Error getting hasher for commit.")
}
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/core/zcncrypto"
Expand Down Expand Up @@ -98,6 +99,9 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ctx := datastore.GetStore().CreateTransaction(context.TODO())

fPath := "/new"
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(fPath)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)
change := &UploadFileChanger{
BaseFileChanger: BaseFileChanger{
Filename: filepath.Base(fPath),
Expand All @@ -107,6 +111,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ValidationRoot: tc.validationRoot,
Size: 2310,
ChunkSize: 65536,
ConnectionID: "connection_id",
},
}

Expand Down
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
a.Finalized = sa.Finalized
a.TimeUnit = sa.TimeUnit
a.FileOptions = sa.FileOptions
a.StartTime = sa.StartTime

m := map[string]interface{}{
"allocation_id": a.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/core/zcncrypto"
"github.com/0chain/gosdk/zboxcore/client"
Expand Down Expand Up @@ -337,6 +338,9 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
tc.setupDbMock()

ctx := datastore.GetStore().CreateTransaction(context.TODO())
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(tc.path)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)

change := &UpdateFileChanger{
BaseFileChanger: BaseFileChanger{
Expand All @@ -352,6 +356,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
ThumbnailSize: 92,
ChunkSize: 65536,
IsFinal: true,
ConnectionID: "connection_id",
},
}

Expand Down
39 changes: 18 additions & 21 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (
)

func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) {
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, encryption.Hash(fileData.Path), conID)
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID)
var initialSize int64
finfo, err := os.Stat(tempFilePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand All @@ -73,7 +73,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
return nil, common.NewError("dir_creation_error", err.Error())
}

f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_WRONLY, 0644)
f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, common.NewError("file_open_error", err.Error())
}
Expand All @@ -83,8 +83,8 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

writtenSize, err := io.Copy(f, infile)
buf := make([]byte, BufferSize)
writtenSize, err := io.CopyBuffer(f, infile, buf)
if err != nil {
return nil, common.NewError("file_write_error", err.Error())
}
Expand All @@ -98,6 +98,17 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i

currentSize := finfo.Size()
if currentSize > initialSize { // Is chunk new or rewritten
if !fileData.IsThumbnail {
_, err = f.Seek(fileData.UploadOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

_, err = io.CopyBuffer(fileData.Hasher, f, buf)
if err != nil {
return nil, common.NewError("file_read_error", err.Error())
}
}
fileRef.ChunkUploaded = true
fs.updateAllocTempFileSize(allocID, currentSize-initialSize)
}
Expand Down Expand Up @@ -243,27 +254,18 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
}

fileSize := rStat.Size()
hasher := GetNewCommitHasher(fileSize)
bufSize := BufferSize
if fileSize < BufferSize {
bufSize = int(fileSize)
}
buffer := make([]byte, bufSize)
_, err = io.CopyBuffer(hasher, r, buffer)
if err != nil {
return false, common.NewError("read_write_error", err.Error())
}

err = hasher.Finalize()
if err != nil {
return false, common.NewError("finalize_error", err.Error())
}
fmtRootBytes, err := hasher.fmt.CalculateRootAndStoreNodes(f)
fmtRootBytes, err := fileData.Hasher.fmt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("fmt_hash_calculation_error", err.Error())
}

validationRootBytes, err := hasher.vt.CalculateRootAndStoreNodes(f)
validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}
Expand All @@ -279,12 +281,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
"calculated validation root does not match with client's validation root")
}

_, err = r.Seek(0, io.SeekStart)
if err != nil {
return false, common.NewError("seek_error", err.Error())
}

_, err = io.Copy(f, r)
_, err = io.CopyBuffer(f, r, buffer)
if err != nil {
return false, common.NewError("write_error", err.Error())
}
Expand Down
6 changes: 4 additions & 2 deletions code/go/0chain.net/blobbercore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ type FileInputData struct {
//Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64
//IsFinal the request is final chunk
IsFinal bool
IsThumbnail bool
IsFinal bool
IsThumbnail bool
FilePathHash string
Hasher *CommitHasher
}

type FileOutputData struct {
Expand Down
Loading
Loading