Skip to content

Commit

Permalink
Merge pull request #1228 from 0chain/sprint-1.10
Browse files Browse the repository at this point in the history
Sprint 1.10
  • Loading branch information
Kishan-Dhakan authored Sep 1, 2023
2 parents c37441a + e5dc06c commit 60317da
Show file tree
Hide file tree
Showing 23 changed files with 256 additions and 104 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-&-publish-docker-image.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env:

jobs:
blobber:
timeout-minutes: 25
runs-on: [self-hosted, arc-runner]
steps:
- name: Set docker image tag
Expand Down Expand Up @@ -95,6 +96,7 @@ jobs:
./docker.local/bin/build.blobber.sh
validator:
timeout-minutes: 20
runs-on: [self-hosted, arc-runner]
steps:
- name: Set docker image tag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (cc *AllocationChangeCollector) ApplyChanges(ctx context.Context, allocatio
return err
}
}
logging.Logger.Info("ApplyChanges", zap.Any("rootRef", rootRef))
_, err = rootRef.CalculateHash(ctx, true)
return err
}
Expand Down
42 changes: 39 additions & 3 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
)

var (
Expand All @@ -15,18 +17,23 @@ var (

var (
connectionObjSizeMap = make(map[string]*ConnectionObjSize)
connectionObjMutex sync.Mutex
connectionObjMutex sync.RWMutex
)

type ConnectionObjSize struct {
Size int64
UpdatedAt time.Time
Changes map[string]*ConnectionChanges
}

type ConnectionChanges struct {
Hasher *filestore.CommitHasher
}

// GetConnectionObjSize gets the connection size from the memory
func GetConnectionObjSize(connectionID string) int64 {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObjSize := connectionObjSizeMap[connectionID]
if connectionObjSize == nil {
return 0
Expand All @@ -43,6 +50,7 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
Size: addSize,
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
return
}
Expand All @@ -51,6 +59,34 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSize.UpdatedAt = time.Now()
}

func GetHasher(connectionID, pathHash string) *filestore.CommitHasher {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
return nil
}
if connectionObj.Changes[pathHash] == nil {
return nil
}
return connectionObj.Changes[pathHash].Hasher
}

func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filestore.CommitHasher) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
}
connectionObjSizeMap[connectionID].Changes[pathHash] = &ConnectionChanges{
Hasher: hasher,
}
}

// DeleteConnectionObjEntry remove the connectionID entry from map
// If the given connectionID is not present, then it is no-op.
func DeleteConnectionObjEntry(connectionID string) {
Expand Down
15 changes: 8 additions & 7 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ type Allocation struct {
RepairerID string `gorm:"column:repairer_id;size:64;not null"`
Expiration common.Timestamp `gorm:"column:expiration_date;not null"`
// AllocationRoot allcation_root of last write_marker
AllocationRoot string `gorm:"column:allocation_root;size:64;not null;default:''"`
FileMetaRoot string `gorm:"column:file_meta_root;size:64;not null;default:''"`
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
AllocationRoot string `gorm:"column:allocation_root;size:64;not null;default:''"`
FileMetaRoot string `gorm:"column:file_meta_root;size:64;not null;default:''"`
BlobberSize int64 `gorm:"column:blobber_size;not null;default:0"`
BlobberSizeUsed int64 `gorm:"column:blobber_size_used;not null;default:0"`
LatestRedeemedWM string `gorm:"column:latest_redeemed_write_marker;size:64"`
IsRedeemRequired bool `gorm:"column:is_redeem_required"`
TimeUnit time.Duration `gorm:"column:time_unit;not null;default:172800000000000"`
StartTime common.Timestamp `gorm:"column:start_time;not null"`
// Ending and cleaning
CleanedUp bool `gorm:"column:cleaned_up;not null;default:false"`
Finalized bool `gorm:"column:finalized;not null;default:false"`
Expand Down
10 changes: 8 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/file_changer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)

// BaseFileChanger base file change processor
Expand Down Expand Up @@ -44,8 +45,9 @@ type BaseFileChanger struct {
ThumbnailSize int64 `json:"thumbnail_size"`
ThumbnailFilename string `json:"thumbnail_filename"`

EncryptedKey string `json:"encrypted_key,omitempty"`
CustomMeta string `json:"custom_meta,omitempty"`
EncryptedKey string `json:"encrypted_key,omitempty"`
EncryptedKeyPoint string `json:"encrypted_key_point,omitempty"`
CustomMeta string `json:"custom_meta,omitempty"`

ChunkSize int64 `json:"chunk_size,omitempty"` // the size of achunk. 64*1024 is default
IsFinal bool `json:"is_final,omitempty"` // current chunk is last or not
Expand Down Expand Up @@ -92,6 +94,10 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
fileInputData.Hasher = GetHasher(fc.ConnectionID, encryption.Hash(fc.Path))
if fileInputData.Hasher == nil {
return common.NewError("invalid_parameters", "Invalid parameters. Error getting hasher for commit.")
}
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/core/zcncrypto"
Expand Down Expand Up @@ -98,6 +99,9 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ctx := datastore.GetStore().CreateTransaction(context.TODO())

fPath := "/new"
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(fPath)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)
change := &UploadFileChanger{
BaseFileChanger: BaseFileChanger{
Filename: filepath.Base(fPath),
Expand All @@ -107,6 +111,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ValidationRoot: tc.validationRoot,
Size: 2310,
ChunkSize: 65536,
ConnectionID: "connection_id",
},
}

Expand Down
1 change: 1 addition & 0 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
a.Finalized = sa.Finalized
a.TimeUnit = sa.TimeUnit
a.FileOptions = sa.FileOptions
a.StartTime = sa.StartTime

m := map[string]interface{}{
"allocation_id": a.ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/core/zcncrypto"
"github.com/0chain/gosdk/zboxcore/client"
Expand Down Expand Up @@ -337,6 +338,9 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
tc.setupDbMock()

ctx := datastore.GetStore().CreateTransaction(context.TODO())
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(tc.path)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)

change := &UpdateFileChanger{
BaseFileChanger: BaseFileChanger{
Expand All @@ -352,6 +356,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
ThumbnailSize: 92,
ChunkSize: 65536,
IsFinal: true,
ConnectionID: "connection_id",
},
}

Expand Down
39 changes: 18 additions & 21 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (
)

func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) {
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, encryption.Hash(fileData.Path), conID)
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID)
var initialSize int64
finfo, err := os.Stat(tempFilePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand All @@ -73,7 +73,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
return nil, common.NewError("dir_creation_error", err.Error())
}

f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_WRONLY, 0644)
f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, common.NewError("file_open_error", err.Error())
}
Expand All @@ -83,8 +83,8 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

writtenSize, err := io.Copy(f, infile)
buf := make([]byte, BufferSize)
writtenSize, err := io.CopyBuffer(f, infile, buf)
if err != nil {
return nil, common.NewError("file_write_error", err.Error())
}
Expand All @@ -98,6 +98,17 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i

currentSize := finfo.Size()
if currentSize > initialSize { // Is chunk new or rewritten
if !fileData.IsThumbnail {
_, err = f.Seek(fileData.UploadOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

_, err = io.CopyBuffer(fileData.Hasher, f, buf)
if err != nil {
return nil, common.NewError("file_read_error", err.Error())
}
}
fileRef.ChunkUploaded = true
fs.updateAllocTempFileSize(allocID, currentSize-initialSize)
}
Expand Down Expand Up @@ -243,27 +254,18 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
}

fileSize := rStat.Size()
hasher := GetNewCommitHasher(fileSize)
bufSize := BufferSize
if fileSize < BufferSize {
bufSize = int(fileSize)
}
buffer := make([]byte, bufSize)
_, err = io.CopyBuffer(hasher, r, buffer)
if err != nil {
return false, common.NewError("read_write_error", err.Error())
}

err = hasher.Finalize()
if err != nil {
return false, common.NewError("finalize_error", err.Error())
}
fmtRootBytes, err := hasher.fmt.CalculateRootAndStoreNodes(f)
fmtRootBytes, err := fileData.Hasher.fmt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("fmt_hash_calculation_error", err.Error())
}

validationRootBytes, err := hasher.vt.CalculateRootAndStoreNodes(f)
validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}
Expand All @@ -279,12 +281,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
"calculated validation root does not match with client's validation root")
}

_, err = r.Seek(0, io.SeekStart)
if err != nil {
return false, common.NewError("seek_error", err.Error())
}

_, err = io.Copy(f, r)
_, err = io.CopyBuffer(f, r, buffer)
if err != nil {
return false, common.NewError("write_error", err.Error())
}
Expand Down
6 changes: 4 additions & 2 deletions code/go/0chain.net/blobbercore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ type FileInputData struct {
//Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64
//IsFinal the request is final chunk
IsFinal bool
IsThumbnail bool
IsFinal bool
IsThumbnail bool
FilePathHash string
Hasher *CommitHasher
}

type FileOutputData struct {
Expand Down
Loading

0 comments on commit 60317da

Please sign in to comment.