From 543e59b8d062cd8e9c9c998441304eaac6d83303 Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Tue, 15 Aug 2023 04:36:54 +0600 Subject: [PATCH 1/9] remove unnecessary print --- zboxcore/sdk/allocation.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 72e527244..b7198a1f8 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -543,7 +543,7 @@ func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileN if err != nil { return err } - fmt.Println("fullRemotepath and localpath", fullRemotePath, localPath) + fileMeta := FileMeta{ Path: localPath, ActualSize: fileInfo.Size(), From 5deca5284ac21946c3f73ac7c0ace7245d0b04df Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Thu, 17 Aug 2023 01:25:54 +0600 Subject: [PATCH 2/9] fix delete and createdir --- zboxcore/sdk/allocation.go | 1 + zboxcore/sdk/commitworker.go | 7 ++++- zboxcore/sdk/deleteworker.go | 2 +- zboxcore/sdk/dirworker.go | 54 +++++++++++++++++++++--------------- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index b7198a1f8..3637247ea 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -386,6 +386,7 @@ func (a *Allocation) CreateDir(remotePath string) error { remotePath: remotePath, wg: &sync.WaitGroup{}, timestamp: timestamp, + alreadyExists: map[uint64]bool{}, Consensus: Consensus{ RWMutex: &sync.RWMutex{}, consensusThresh: a.consensusThreshold, diff --git a/zboxcore/sdk/commitworker.go b/zboxcore/sdk/commitworker.go index 9c6189275..81dab2470 100644 --- a/zboxcore/sdk/commitworker.go +++ b/zboxcore/sdk/commitworker.go @@ -101,10 +101,15 @@ func (commitreq *CommitRequest) processCommit() { var req *http.Request var lR ReferencePathResult req, err := zboxutil.NewReferencePathRequest(commitreq.blobber.Baseurl, commitreq.allocationID, commitreq.allocationTx, paths) - if err != nil || len(paths) == 0 { + if err != nil { l.Logger.Error("Creating ref path req", err) return } + if len(paths) == 0 { + l.Logger.Info("Nothing to commit") + commitreq.result = SuccessCommitResult() + return + } ctx, cncl := context.WithTimeout(context.Background(), (time.Second * 30)) err = zboxutil.HttpDo(ctx, cncl, req, func(resp *http.Response, err error) error { diff --git a/zboxcore/sdk/deleteworker.go b/zboxcore/sdk/deleteworker.go index f7dd01a1a..9e8c2330e 100644 --- a/zboxcore/sdk/deleteworker.go +++ b/zboxcore/sdk/deleteworker.go @@ -357,7 +357,7 @@ func (dop *DeleteOperation) Process(allocObj *Allocation, connectionID string) ( fmt.Sprintf("Delete failed. Required consensus %d, got %d", deleteReq.consensus.consensusThresh, deleteReq.consensus.consensus)) } - l.Logger.Info("Delete Processs Ended ") + l.Logger.Info("Delete Process Ended ") return objectTreeRefs, deleteReq.deleteMask, nil } diff --git a/zboxcore/sdk/dirworker.go b/zboxcore/sdk/dirworker.go index 60e63bd3a..c19ba468e 100644 --- a/zboxcore/sdk/dirworker.go +++ b/zboxcore/sdk/dirworker.go @@ -42,6 +42,7 @@ type DirRequest struct { mu *sync.Mutex connectionID string timestamp int64 + alreadyExists map[uint64]bool Consensus } @@ -63,6 +64,7 @@ func (req *DirRequest) ProcessWithBlobbers(a *Allocation) int { } if alreadyExists { countMu.Lock() + req.alreadyExists[pos] = true existingDirCount++ countMu.Unlock() } @@ -258,9 +260,6 @@ func (req *DirRequest) createDirInBlobber(blobber *blockchain.StorageNode, pos u l.Logger.Error(blobber.Baseurl, " Response: ", msg) if strings.Contains(msg, DirectoryExists) { req.Consensus.Done() - req.mu.Lock() - req.dirMask = req.dirMask.And(zboxutil.NewUint128(1).Lsh(pos).Not()) - req.mu.Unlock() alreadyExists = true return } @@ -286,11 +285,12 @@ func (req *DirRequest) createDirInBlobber(blobber *blockchain.StorageNode, pos u } type DirOperation struct { - remotePath string - ctx context.Context - ctxCncl context.CancelFunc - dirMask zboxutil.Uint128 - maskMU *sync.Mutex + remotePath string + ctx context.Context + ctxCncl context.CancelFunc + dirMask zboxutil.Uint128 + maskMU *sync.Mutex + alreadyExists map[uint64]bool Consensus } @@ -298,16 +298,17 @@ type DirOperation struct { func (dirOp *DirOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) { refs := make([]fileref.RefEntity, len(allocObj.Blobbers)) dR := &DirRequest{ - allocationID: allocObj.ID, - allocationTx: allocObj.Tx, - connectionID: connectionID, - blobbers: allocObj.Blobbers, - remotePath: dirOp.remotePath, - ctx: dirOp.ctx, - ctxCncl: dirOp.ctxCncl, - dirMask: dirOp.dirMask, - mu: dirOp.maskMU, - wg: &sync.WaitGroup{}, + allocationID: allocObj.ID, + allocationTx: allocObj.Tx, + connectionID: connectionID, + blobbers: allocObj.Blobbers, + remotePath: dirOp.remotePath, + ctx: dirOp.ctx, + ctxCncl: dirOp.ctxCncl, + dirMask: dirOp.dirMask, + mu: dirOp.maskMU, + wg: &sync.WaitGroup{}, + alreadyExists: make(map[uint64]bool), } dR.Consensus = Consensus{ RWMutex: &sync.RWMutex{}, @@ -316,6 +317,7 @@ func (dirOp *DirOperation) Process(allocObj *Allocation, connectionID string) ([ } _ = dR.ProcessWithBlobbers(allocObj) + dirOp.alreadyExists = dR.alreadyExists if !dR.isConsensusOk() { return nil, dR.dirMask, errors.New("consensus_not_met", "directory creation failed due to consensus not met") @@ -330,12 +332,17 @@ func (dirOp *DirOperation) buildChange(refs []fileref.RefEntity, uid uuid.UUID) changes := make([]allocationchange.AllocationChange, len(refs)) for i := dirOp.dirMask; !i.Equals(zboxutil.NewUint128(0)); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { pos = uint64(i.TrailingZeros()) - newChange := &allocationchange.DirCreateChange{ - RemotePath: dirOp.remotePath, - Uuid: uid, - Timestamp: common.Now(), + if dirOp.alreadyExists[pos] { + newChange := &allocationchange.EmptyFileChange{} + changes[pos] = newChange + } else { + newChange := &allocationchange.DirCreateChange{ + RemotePath: dirOp.remotePath, + Uuid: uid, + Timestamp: common.Now(), + } + changes[pos] = newChange } - changes[pos] = newChange } return changes } @@ -367,5 +374,6 @@ func NewDirOperation(remotePath string, dirMask zboxutil.Uint128, maskMU *sync.M dirOp.consensusThresh = consensusTh dirOp.fullconsensus = fullConsensus dirOp.ctx, dirOp.ctxCncl = context.WithCancel(ctx) + dirOp.alreadyExists = make(map[uint64]bool) return dirOp } From 8c9cef8c2a55094879685c0461203119a4ae0d10 Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Sat, 26 Aug 2023 17:56:55 +0600 Subject: [PATCH 3/9] return major error --- zboxcore/sdk/allocation.go | 10 +++++++++- zboxcore/sdk/multi_operation_worker.go | 19 +++++++++---------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 21c3c8214..37da50bfa 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -832,6 +832,7 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error { mo.connectionID = zboxutil.NewConnectionId() previousPaths := make(map[string]bool) + connectionErrors := make([]error, len(mo.allocationObj.Blobbers)) var wg sync.WaitGroup for blobberIdx := range mo.allocationObj.Blobbers { @@ -841,14 +842,21 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error { err := mo.createConnectionObj(pos) if err != nil { l.Logger.Error(err.Error()) + connectionErrors[pos] = err } }(blobberIdx) } wg.Wait() // Check consensus if mo.operationMask.CountOnes() < mo.consensusThresh { + majorErr := zboxutil.MajorError(connectionErrors) + if majorErr != nil { + return errors.New("consensus_not_met", + fmt.Sprintf("Multioperation (create_connection) failed. Required consensus %d got %d. Major error: %s", + mo.consensusThresh, mo.operationMask.CountOnes(), majorErr.Error())) + } return errors.New("consensus_not_met", - fmt.Sprintf("Multioperation failed. Required consensus %d got %d", + fmt.Sprintf("Multioperation (create_connection) failed. Required consensus %d got %d", mo.consensusThresh, mo.operationMask.CountOnes())) } diff --git a/zboxcore/sdk/multi_operation_worker.go b/zboxcore/sdk/multi_operation_worker.go index 36079c5d3..c351042c2 100644 --- a/zboxcore/sdk/multi_operation_worker.go +++ b/zboxcore/sdk/multi_operation_worker.go @@ -151,7 +151,7 @@ func (mo *MultiOperation) Process() error { ctxCncl := mo.ctxCncl defer ctxCncl() - errs := make(chan error, 1) + errsSlice := make([]error, len(mo.operations)) mo.operationMask = zboxutil.NewUint128(0) for idx, op := range mo.operations { uid := util.GetNewUUID() @@ -169,13 +169,8 @@ func (mo *MultiOperation) Process() error { refs, mask, err := op.Process(mo.allocationObj, mo.connectionID) // Process with each blobber if err != nil { l.Logger.Error(err) - - select { - case errs <- errors.New("", err.Error()): - default: - } + errsSlice[idx] = errors.New("", err.Error()) ctxCncl() - return } mo.maskMU.Lock() @@ -187,11 +182,15 @@ func (mo *MultiOperation) Process() error { }(op, idx) } wg.Wait() - if ctx.Err() != nil { - return <-errs - } + // Check consensus if mo.operationMask.CountOnes() < mo.consensusThresh { + majorErr := zboxutil.MajorError(errsSlice) + if majorErr != nil { + return errors.New("consensus_not_met", + fmt.Sprintf("Multioperation failed. Required consensus %d got %d. Major error: %s", + mo.consensusThresh, mo.operationMask.CountOnes(), majorErr.Error())) + } return errors.New("consensus_not_met", fmt.Sprintf("Multioperation failed. Required consensus %d got %d", mo.consensusThresh, mo.operationMask.CountOnes())) From 6daa75c59be3a1f09e6fba460355ac2414186edb Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Sat, 26 Aug 2023 19:23:31 +0600 Subject: [PATCH 4/9] add webstreaming --- mobilesdk/zbox/storage.go | 23 ++++++++++++++--------- winsdk/models.go | 7 ++++--- winsdk/storage.go | 19 +++++++++++-------- zboxcore/sdk/allocation.go | 22 +++++++++++++--------- zboxcore/sdk/upload_worker.go | 6 ++++-- 5 files changed, 46 insertions(+), 31 deletions(-) diff --git a/mobilesdk/zbox/storage.go b/mobilesdk/zbox/storage.go index 2dc193193..dc043cd3b 100644 --- a/mobilesdk/zbox/storage.go +++ b/mobilesdk/zbox/storage.go @@ -26,13 +26,14 @@ type MultiOperationOption struct { } type MultiUploadOption struct { - FilePath string `json:"filePath,omitempty"` - FileName string `json:"fileName,omitempty"` - RemotePath string `json:"remotePath,omitempty"` - ThumbnailPath string `json:"thumbnailPath,omitempty"` - Encrypt bool `json:"encrypt,omitempty"` - ChunkNumber int `json:"chunkNumber,omitempty"` - IsUpdate bool `json:"isUpdate,omitempty"` + FilePath string `json:"filePath,omitempty"` + FileName string `json:"fileName,omitempty"` + RemotePath string `json:"remotePath,omitempty"` + ThumbnailPath string `json:"thumbnailPath,omitempty"` + Encrypt bool `json:"encrypt,omitempty"` + ChunkNumber int `json:"chunkNumber,omitempty"` + IsUpdate bool `json:"isUpdate,omitempty"` + IsWebstreaming bool `json:"isWebstreaming,omitempty"` } type MultiDownloadOption struct { @@ -336,6 +337,7 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str encrypts := make([]bool, totalUploads) chunkNumbers := make([]int, totalUploads) isUpdates := make([]bool, totalUploads) + isWebstreaming := make([]bool, totalUploads) for idx, option := range options { filePaths[idx] = option.FilePath fileNames[idx] = option.FileName @@ -344,13 +346,14 @@ func MultiUpload(allocationID string, workdir string, jsonMultiUploadOptions str chunkNumbers[idx] = option.ChunkNumber encrypts[idx] = option.Encrypt isUpdates[idx] = false + isWebstreaming[idx] = option.IsWebstreaming } a, err := getAllocation(allocationID) if err != nil { return err } - return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb}) + return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb}) } @@ -373,6 +376,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str encrypts := make([]bool, totalUploads) chunkNumbers := make([]int, totalUploads) isUpdates := make([]bool, totalUploads) + isWebstreming := make([]bool, totalUploads) for idx, option := range options { filePaths[idx] = option.FilePath fileNames[idx] = option.FileName @@ -381,6 +385,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str chunkNumbers[idx] = option.ChunkNumber encrypts[idx] = option.Encrypt isUpdates[idx] = true + isWebstreming[idx] = option.IsWebstreaming } if err != nil { return err @@ -390,7 +395,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str if err != nil { return err } - return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, &StatusCallbackWrapped{Callback: statusCb}) + return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreming, &StatusCallbackWrapped{Callback: statusCb}) } diff --git a/winsdk/models.go b/winsdk/models.go index 8fb2723d7..ef86b5238 100644 --- a/winsdk/models.go +++ b/winsdk/models.go @@ -5,9 +5,10 @@ type UploadFile struct { Path string ThumbnailPath string - RemotePath string - Encrypt bool - IsUpdate bool + RemotePath string + Encrypt bool + IsUpdate bool + IsWebstreaming bool ChunkNumber int } diff --git a/winsdk/storage.go b/winsdk/storage.go index 00ffb40ad..005c3798f 100644 --- a/winsdk/storage.go +++ b/winsdk/storage.go @@ -163,13 +163,14 @@ type MultiOperationOption struct { } type MultiUploadOption struct { - FilePath string `json:"filePath,omitempty"` - FileName string `json:"fileName,omitempty"` - RemotePath string `json:"remotePath,omitempty"` - ThumbnailPath string `json:"thumbnailPath,omitempty"` - Encrypt bool `json:"encrypt,omitempty"` - ChunkNumber int `json:"chunkNumber,omitempty"` - IsUpdate bool `json:"isUpdate,omitempty"` + FilePath string `json:"filePath,omitempty"` + FileName string `json:"fileName,omitempty"` + RemotePath string `json:"remotePath,omitempty"` + ThumbnailPath string `json:"thumbnailPath,omitempty"` + Encrypt bool `json:"encrypt,omitempty"` + ChunkNumber int `json:"chunkNumber,omitempty"` + IsUpdate bool `json:"isUpdate,omitempty"` + IsWebstreaming bool `json:"isWebstreaming,omitempty"` } // MultiOperation - do copy, move, delete and createdir operation together @@ -323,6 +324,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char { chunkNumbers := make([]int, totalUploads) encrypts := make([]bool, totalUploads) isUpdates := make([]bool, totalUploads) + isWebstreaming := make([]bool, totalUploads) statusBar := &StatusCallback{ status: make(map[string]*Status), @@ -335,6 +337,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char { remotePaths[idx] = option.RemotePath chunkNumbers[idx] = option.ChunkNumber isUpdates[idx] = option.IsUpdate + isWebstreaming[idx] = option.IsWebstreaming encrypts[idx] = option.Encrypt statusBar.status[option.RemotePath+option.Name] = &Status{} } @@ -346,7 +349,7 @@ func BulkUpload(uploadID, allocationID, files *C.char) *C.char { statusCaches.Add(C.GoString(uploadID), statusBar) - err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, statusBar) + err = a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, statusBar) if err != nil { return WithJSON(nil, err) } diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 809febafc..30b84d973 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -208,12 +208,13 @@ type Allocation struct { } type OperationRequest struct { - OperationType string - LocalPath string - RemotePath string - DestName string // Required only for rename operation - DestPath string // Required for copy and move operation - IsUpdate bool + OperationType string + LocalPath string + RemotePath string + DestName string // Required only for rename operation + DestPath string // Required for copy and move operation + IsUpdate bool + IsWebstreaming bool // Required for uploads Workdir string @@ -495,7 +496,7 @@ func (a *Allocation) EncryptAndUploadFileWithThumbnail( ) } -func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, status StatusCallback) error { +func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileNames []string, thumbnailPaths []string, encrypts []bool, chunkNumbers []int, remotePaths []string, isUpdate []bool, isWebstreaming []bool, status StatusCallback) error { if len(localPaths) != len(thumbnailPaths) { return errors.New("invalid_value", "length of localpaths and thumbnailpaths must be equal") } @@ -588,6 +589,9 @@ func (a *Allocation) StartMultiUpload(workdir string, localPaths []string, fileN if isUpdate[idx] { operationRequests[idx].OperationType = constants.FileOperationUpdate } + if isWebstreaming[idx] { + operationRequests[idx].IsWebstreaming = true + } } err := a.DoMultiOperation(operationRequests) @@ -873,13 +877,13 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error { operation = NewMoveOperation(op.RemotePath, op.DestPath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx) case constants.FileOperationInsert: - operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.Opts...) + operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, false, op.IsWebstreaming, op.Opts...) case constants.FileOperationDelete: operation = NewDeleteOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx) case constants.FileOperationUpdate: - operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.Opts...) + operation = NewUploadOperation(op.Workdir, op.FileMeta, op.FileReader, true, op.IsWebstreaming, op.Opts...) case constants.FileOperationCreateDir: operation = NewDirOperation(op.RemotePath, mo.operationMask, mo.maskMU, mo.consensusThresh, mo.fullconsensus, mo.ctx) diff --git a/zboxcore/sdk/upload_worker.go b/zboxcore/sdk/upload_worker.go index f548ab1e5..5e90ec375 100644 --- a/zboxcore/sdk/upload_worker.go +++ b/zboxcore/sdk/upload_worker.go @@ -19,12 +19,13 @@ type UploadOperation struct { opts []ChunkedUploadOption refs []*fileref.FileRef isUpdate bool + isWebstreaming bool statusCallback StatusCallback opCode int } func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) { - cu, err := CreateChunkedUpload(uo.workdir, allocObj, uo.fileMeta, uo.fileReader, uo.isUpdate, false, false, connectionID, uo.opts...) + cu, err := CreateChunkedUpload(uo.workdir, allocObj, uo.fileMeta, uo.fileReader, uo.isUpdate, false, uo.isWebstreaming, connectionID, uo.opts...) if err != nil { uploadMask := zboxutil.NewUint128(1).Lsh(uint64(len(allocObj.Blobbers))).Sub64(1) return nil, uploadMask, err @@ -123,12 +124,13 @@ func (uo *UploadOperation) Error(allocObj *Allocation, consensus int, err error) } } -func NewUploadOperation(workdir string, fileMeta FileMeta, fileReader io.Reader, isUpdate bool, opts ...ChunkedUploadOption) *UploadOperation { +func NewUploadOperation(workdir string, fileMeta FileMeta, fileReader io.Reader, isUpdate, isWebstreaming bool, opts ...ChunkedUploadOption) *UploadOperation { uo := &UploadOperation{} uo.workdir = workdir uo.fileMeta = fileMeta uo.fileReader = fileReader uo.opts = opts uo.isUpdate = isUpdate + uo.isWebstreaming = isWebstreaming return uo } From c3da61b1c8ba69f34d6e9ae153ca3e905a8d5b2b Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Sat, 26 Aug 2023 19:28:02 +0600 Subject: [PATCH 5/9] fix typos --- mobilesdk/zbox/storage.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mobilesdk/zbox/storage.go b/mobilesdk/zbox/storage.go index dc043cd3b..3e88bb6ef 100644 --- a/mobilesdk/zbox/storage.go +++ b/mobilesdk/zbox/storage.go @@ -376,7 +376,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str encrypts := make([]bool, totalUploads) chunkNumbers := make([]int, totalUploads) isUpdates := make([]bool, totalUploads) - isWebstreming := make([]bool, totalUploads) + isWebstreaming := make([]bool, totalUploads) for idx, option := range options { filePaths[idx] = option.FilePath fileNames[idx] = option.FileName @@ -385,7 +385,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str chunkNumbers[idx] = option.ChunkNumber encrypts[idx] = option.Encrypt isUpdates[idx] = true - isWebstreming[idx] = option.IsWebstreaming + isWebstreaming[idx] = option.IsWebstreaming } if err != nil { return err @@ -395,7 +395,7 @@ func MultiUpdate(allocationID string, workdir string, jsonMultiUploadOptions str if err != nil { return err } - return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreming, &StatusCallbackWrapped{Callback: statusCb}) + return a.StartMultiUpload(workdir, filePaths, fileNames, thumbnailPaths, encrypts, chunkNumbers, remotePaths, isUpdates, isWebstreaming, &StatusCallbackWrapped{Callback: statusCb}) } From a30751bb73337b5ec5130e5f596e73d1940a66e8 Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Sat, 26 Aug 2023 22:45:00 +0600 Subject: [PATCH 6/9] add webstreaming to wasm multiupload --- wasmsdk/blobber.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 7b06e9121..e32b0a8a4 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -475,6 +475,7 @@ type BulkUploadOption struct { Webstreaming bool `json:"webstreaming,omitempty"` IsUpdate bool `json:"isUpdate,omitempty"` IsRepair bool `json:"isRepair,omitempty"` + IsWebstreaming bool `json:"isWebstreaming,omitempty"` NumBlocks int `json:"numBlocks,omitempty"` FileSize int64 `json:"fileSize,omitempty"` @@ -671,11 +672,12 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) { sdk.WithChunkNumber(numBlocks), } operationRequests[idx] = sdk.OperationRequest{ - FileMeta: fileMeta, - FileReader: fileReader, - OperationType: FileOperationInsert, - Opts: options, - Workdir: "/", + FileMeta: fileMeta, + FileReader: fileReader, + OperationType: FileOperationInsert, + Opts: options, + Workdir: "/", + isWebstreaming: option.IsWebstreaming, } } From 5e730b83118103aa0c1bfb3ffb84401af968823e Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Sat, 26 Aug 2023 22:50:30 +0600 Subject: [PATCH 7/9] fix typo --- wasmsdk/blobber.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index e32b0a8a4..e7a6b110f 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -677,7 +677,7 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) { OperationType: FileOperationInsert, Opts: options, Workdir: "/", - isWebstreaming: option.IsWebstreaming, + IsWebstreaming: option.IsWebstreaming, } } From 09fbda837d230c3decb9ad6698eca5ceae6ba690 Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Sun, 27 Aug 2023 03:46:57 +0600 Subject: [PATCH 8/9] cleanup --- zboxcore/sdk/allocation.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index e0f6ba744..c738a6015 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -856,11 +856,11 @@ func (a *Allocation) DoMultiOperation(operations []OperationRequest) error { majorErr := zboxutil.MajorError(connectionErrors) if majorErr != nil { return errors.New("consensus_not_met", - fmt.Sprintf("Multioperation (create_connection) failed. Required consensus %d got %d. Major error: %s", + fmt.Sprintf("Multioperation: create connection failed. Required consensus %d got %d. Major error: %s", mo.consensusThresh, mo.operationMask.CountOnes(), majorErr.Error())) } return errors.New("consensus_not_met", - fmt.Sprintf("Multioperation (create_connection) failed. Required consensus %d got %d", + fmt.Sprintf("Multioperation: create connection failed. Required consensus %d got %d", mo.consensusThresh, mo.operationMask.CountOnes())) } From 39deea65eca2da84e23639825cf6b0dbd86c45af Mon Sep 17 00:00:00 2001 From: din-mukhammed Date: Mon, 28 Aug 2023 01:54:10 +0600 Subject: [PATCH 9/9] fix completed callback --- zboxcore/sdk/upload_worker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zboxcore/sdk/upload_worker.go b/zboxcore/sdk/upload_worker.go index 5e90ec375..6ce700cc9 100644 --- a/zboxcore/sdk/upload_worker.go +++ b/zboxcore/sdk/upload_worker.go @@ -32,6 +32,8 @@ func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([ } uo.statusCallback = cu.statusCallback uo.opCode = cu.opCode + uo.fileMeta = cu.fileMeta + uo.fileReader = cu.fileReader err = cu.process() if err != nil {