From 7bc12f63e19c7947a49051aba9ef5918be1aae71 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Fri, 18 Aug 2023 02:49:39 +0530 Subject: [PATCH 1/7] add shard log --- zboxcore/sdk/downloadworker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index 9e16a60c9..6162222d8 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -94,6 +94,7 @@ func (req *DownloadRequest) removeFromMask(pos uint64) { func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int64) ([][][]byte, error) { shards := make([][][]byte, totalBlock) + l.Logger.Info("[downloadReq]", "shards", totalBlock, "blobbers", len(req.blobbers)) for i := range shards { shards[i] = make([][]byte, len(req.blobbers)) } From 3f1d555f06e7fb5ea103a9e222f7f2dd15940280 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Fri, 18 Aug 2023 02:55:42 +0530 Subject: [PATCH 2/7] mend --- zboxcore/sdk/downloadworker.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index 6162222d8..6e5fb97ae 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -286,6 +286,10 @@ func (req *DownloadRequest) decodeEC(shards [][]byte) (data []byte, isValid bool // fillShards will fill `shards` with data from blobbers that belongs to specific // blockNumber and blobber's position index in an allocation func (req *DownloadRequest) fillShards(shards [][][]byte, result *downloadBlock) (err error) { + if len(result.BlockChunks) > len(shards) { + l.Logger.Error("Invalid data received from blobber", result.idx) + return errors.New("invalid_data", "Invalid data received from blobber") + } for i := 0; i < len(result.BlockChunks); i++ { var data []byte if req.encryptedKey != "" { From c76e7eb6d2e38f04dff8d2c69ab244fe46ce5262 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Fri, 18 Aug 2023 03:01:12 +0530 Subject: [PATCH 3/7] add encrypt log --- zboxcore/sdk/downloadworker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index 6e5fb97ae..dd05aa58f 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -94,7 +94,7 @@ func (req *DownloadRequest) removeFromMask(pos uint64) { func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int64) ([][][]byte, error) { shards := make([][][]byte, totalBlock) - l.Logger.Info("[downloadReq]", "shards", totalBlock, "blobbers", len(req.blobbers)) + l.Logger.Info("[downloadReq]", "shards", totalBlock, "blobbers", len(req.blobbers), "encryption", req.encryptedKey != "") for i := range shards { shards[i] = make([][]byte, len(req.blobbers)) } From 4a85dcb2e56b0187a9811255b64e1ba8daf8d894 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Fri, 18 Aug 2023 19:11:19 +0530 Subject: [PATCH 4/7] add size tp upload data --- zboxcore/sdk/allocation.go | 4 ++-- zboxcore/sdk/chunked_upload.go | 10 ++++++---- zboxcore/sdk/chunked_upload_form_builder.go | 5 +++-- zboxcore/sdk/chunked_upload_form_builder_bench_test.go | 2 +- zboxcore/sdk/chunked_upload_model.go | 1 + 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 72e527244..1e21dd3f0 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -1064,6 +1064,8 @@ func (a *Allocation) addAndGenerateDownloadRequest( localFilePath string, ) error { var connectionID string + a.mutex.Lock() + defer a.mutex.Unlock() if len(a.downloadRequests) > 0 { connectionID = a.downloadRequests[0].connectionID } else { @@ -1075,7 +1077,6 @@ func (a *Allocation) addAndGenerateDownloadRequest( if err != nil { return err } - a.mutex.Lock() a.downloadProgressMap[remotePath] = downloadReq a.downloadRequests = append(a.downloadRequests, downloadReq) if isFinal { @@ -1085,7 +1086,6 @@ func (a *Allocation) addAndGenerateDownloadRequest( a.processReadMarker(downloadOps) }() } - a.mutex.Unlock() return nil } diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index 0efc55065..4212e5f01 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -205,8 +205,8 @@ func CreateChunkedUpload( } su.loadProgress() - - su.fileHasher = CreateHasher(getShardSize(su.fileMeta.ActualSize, su.allocationObj.DataShards, su.encryptOnUpload)) + su.shardSize = getShardSize(su.fileMeta.ActualSize, su.allocationObj.DataShards, su.encryptOnUpload) + su.fileHasher = CreateHasher(su.shardSize) // encrypt option has been changed. upload it from scratch // chunkSize has been changed. upload it from scratch @@ -318,6 +318,8 @@ type ChunkedUpload struct { shardUploadedSize int64 // shardUploadedThumbnailSize how much thumbnail bytes a shard has. it is original size shardUploadedThumbnailSize int64 + // size of shard + shardSize int64 // statusCallback trigger progress on StatusCallback statusCallback StatusCallback @@ -385,7 +387,7 @@ func (su *ChunkedUpload) createUploadProgress(connectionId string) { for i := 0; i < len(su.progress.Blobbers); i++ { su.progress.Blobbers[i] = &UploadBlobberStatus{ - Hasher: CreateHasher(getShardSize(su.fileMeta.ActualSize, su.allocationObj.DataShards, su.encryptOnUpload)), + Hasher: CreateHasher(su.shardSize), } } @@ -618,7 +620,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, body, formData, err := su.formBuilder.Build( &su.fileMeta, blobber.progress.Hasher, su.progress.ConnectionID, su.chunkSize, chunkStartIndex, chunkEndIndex, isFinal, encryptedKey, - fileShards[pos], thumbnailChunkData, + fileShards[pos], thumbnailChunkData, su.shardSize, ) if err != nil { diff --git a/zboxcore/sdk/chunked_upload_form_builder.go b/zboxcore/sdk/chunked_upload_form_builder.go index c8fa20fdf..55515b8f8 100644 --- a/zboxcore/sdk/chunked_upload_form_builder.go +++ b/zboxcore/sdk/chunked_upload_form_builder.go @@ -22,7 +22,7 @@ type ChunkedUploadFormBuilder interface { fileMeta *FileMeta, hasher Hasher, connectionID string, chunkSize int64, chunkStartIndex, chunkEndIndex int, isFinal bool, encryptedKey string, fileChunksData [][]byte, - thumbnailChunkData []byte, + thumbnailChunkData []byte, shardSize int64, ) (*bytes.Buffer, ChunkedUploadFormMetadata, error) } @@ -48,7 +48,7 @@ func (b *chunkedUploadFormBuilder) Build( fileMeta *FileMeta, hasher Hasher, connectionID string, chunkSize int64, chunkStartIndex, chunkEndIndex int, isFinal bool, encryptedKey string, fileChunksData [][]byte, - thumbnailChunkData []byte, + thumbnailChunkData []byte, shardSize int64, ) (*bytes.Buffer, ChunkedUploadFormMetadata, error) { metadata := ChunkedUploadFormMetadata{ @@ -78,6 +78,7 @@ func (b *chunkedUploadFormBuilder) Build( ChunkStartIndex: chunkStartIndex, ChunkEndIndex: chunkEndIndex, UploadOffset: chunkSize * int64(chunkStartIndex), + Size: shardSize, } formWriter := multipart.NewWriter(body) diff --git a/zboxcore/sdk/chunked_upload_form_builder_bench_test.go b/zboxcore/sdk/chunked_upload_form_builder_bench_test.go index d4fbcf1b9..25cea9953 100644 --- a/zboxcore/sdk/chunked_upload_form_builder_bench_test.go +++ b/zboxcore/sdk/chunked_upload_form_builder_bench_test.go @@ -68,7 +68,7 @@ func BenchmarkChunkedUploadFormBuilder(b *testing.B) { fileBytes := buf[begin:end] - _, _, err := builder.Build(fileMeta, hasher, "connectionID", int64(bm.ChunkSize), chunkIndex, chunkIndex, isFinal, "", [][]byte{fileBytes}, nil) + _, _, err := builder.Build(fileMeta, hasher, "connectionID", int64(bm.ChunkSize), chunkIndex, chunkIndex, isFinal, "", [][]byte{fileBytes}, nil, getShardSize(fileMeta.ActualSize, 1, false)) if err != nil { b.Fatal(err) return diff --git a/zboxcore/sdk/chunked_upload_model.go b/zboxcore/sdk/chunked_upload_model.go index 669ac702e..b78e15c3c 100644 --- a/zboxcore/sdk/chunked_upload_model.go +++ b/zboxcore/sdk/chunked_upload_model.go @@ -78,6 +78,7 @@ type UploadFormData struct { ChunkEndIndex int `json:"chunk_end_index,omitempty"` // end index of chunks. all chunks MUST be uploaded one by one because of streaming merkle hash ChunkSize int64 `json:"chunk_size,omitempty"` // the size of a chunk. 64*1024 is default UploadOffset int64 `json:"upload_offset,omitempty"` // It is next position that new incoming chunk should be append to + Size int64 `json:"size"` // total size of shard } From df1bdadb1e0cfe599680b1f0230bb08f06da6721 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 19 Aug 2023 00:39:52 +0530 Subject: [PATCH 5/7] fix upload --- zboxcore/sdk/chunked_upload.go | 67 +++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index 4212e5f01..c929beaf2 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -8,6 +8,7 @@ import ( "net/http" "path/filepath" "sync" + "sync/atomic" "time" "errors" @@ -27,7 +28,6 @@ import ( "github.com/0chain/gosdk/zboxcore/zboxutil" "github.com/google/uuid" "github.com/klauspost/reedsolomon" - "golang.org/x/sync/errgroup" ) const ( @@ -589,7 +589,6 @@ func (su *ChunkedUpload) readChunks(num int) (*batchChunksData, error) { func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, fileShards []blobberShards, thumbnailShards blobberShards, isFinal bool, uploadLength int64) error { - su.consensus.Reset() ctx, cancel := context.WithCancel(context.TODO()) @@ -600,50 +599,66 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, encryptedKey = su.fileEncscheme.GetEncryptedKey() } + var errCount int32 + + wgErrors := make(chan error) + wgDone := make(chan bool) if len(fileShards) == 0 { return thrown.New("upload_failed", "Upload failed. No data to upload") } - - eg, _ := errgroup.WithContext(ctx) - for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(uint64(i.TrailingZeros())).Not()) { - pos := uint64(i.TrailingZeros()) + wg := &sync.WaitGroup{} + var pos uint64 + for i := su.uploadMask; !i.Equals64(0); i = i.And(zboxutil.NewUint128(1).Lsh(pos).Not()) { + pos = uint64(i.TrailingZeros()) blobber := su.blobbers[pos] blobber.progress.UploadLength += uploadLength var thumbnailChunkData []byte + if len(thumbnailShards) > 0 { thumbnailChunkData = thumbnailShards[pos] } - eg.Go(func() error { - return func(blobber *ChunkedUploadBlobber, pos uint64, thumbnailChunkData []byte) error { - body, formData, err := su.formBuilder.Build( - &su.fileMeta, blobber.progress.Hasher, su.progress.ConnectionID, - su.chunkSize, chunkStartIndex, chunkEndIndex, isFinal, encryptedKey, - fileShards[pos], thumbnailChunkData, su.shardSize, - ) - - if err != nil { - return err + wg.Add(1) + go func(b *ChunkedUploadBlobber, thumbnailChunkData []byte, pos uint64) { + defer wg.Done() + body, formData, err := su.formBuilder.Build( + &su.fileMeta, blobber.progress.Hasher, su.progress.ConnectionID, + su.chunkSize, chunkStartIndex, chunkEndIndex, isFinal, encryptedKey, + fileShards[pos], thumbnailChunkData, su.shardSize) + if err != nil { + errC := atomic.AddInt32(&errCount, 1) + if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later + wgErrors <- err } + } - err = blobber.sendUploadRequest(ctx, su, chunkEndIndex, isFinal, encryptedKey, body, formData, pos) - if err != nil { - logger.Logger.Error("error during sendUploadRequest", err) - return thrown.New("upload_failed", fmt.Sprintf("Upload failed. %s", err)) + err = b.sendUploadRequest(ctx, su, chunkEndIndex, isFinal, encryptedKey, body, formData, pos) + if err != nil { + logger.Logger.Error("error during sendUploadRequest", err) + errC := atomic.AddInt32(&errCount, 1) + if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later + wgErrors <- err } - return nil - }(blobber, pos, thumbnailChunkData) - }) + } + }(blobber, thumbnailChunkData, pos) } - if err := eg.Wait(); err != nil { - return err + go func() { + wg.Wait() + close(wgDone) + }() + + select { + case <-wgDone: + break + case err := <-wgErrors: + return thrown.New("upload_failed", fmt.Sprintf("Upload failed. %s", err)) } if !su.consensus.isConsensusOk() { - return thrown.New("consensus_not_met", fmt.Sprintf("Upload failed File not found for path %s. Required consensus at least %d, got %d", + return thrown.New("consensus_not_met", fmt.Sprintf("Upload failed File not found for path %s. Required consensus atleast %d, got %d", su.fileMeta.RemotePath, su.consensus.consensusThresh, su.consensus.getConsensus())) } From a83fba945c5656552454e44786656895a121d8f3 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 19 Aug 2023 18:07:50 +0530 Subject: [PATCH 6/7] return on err --- zboxcore/sdk/chunked_upload.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index c929beaf2..ba7feaf6a 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -631,6 +631,7 @@ func (su *ChunkedUpload) processUpload(chunkStartIndex, chunkEndIndex int, if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later wgErrors <- err } + return } err = b.sendUploadRequest(ctx, su, chunkEndIndex, isFinal, encryptedKey, body, formData, pos) From 6942f968e7239b7c8f4e5e8a0874d947e8020cda Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Wed, 23 Aug 2023 00:49:46 +0530 Subject: [PATCH 7/7] rmv logs --- zboxcore/sdk/downloadworker.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/zboxcore/sdk/downloadworker.go b/zboxcore/sdk/downloadworker.go index dc7821a53..f4e3f0679 100644 --- a/zboxcore/sdk/downloadworker.go +++ b/zboxcore/sdk/downloadworker.go @@ -94,7 +94,6 @@ func (req *DownloadRequest) removeFromMask(pos uint64) { func (req *DownloadRequest) getBlocksDataFromBlobbers(startBlock, totalBlock int64) ([][][]byte, error) { shards := make([][][]byte, totalBlock) - l.Logger.Info("[downloadReq]", "shards", totalBlock, "blobbers", len(req.blobbers), "encryption", req.encryptedKey != "") for i := range shards { shards[i] = make([][]byte, len(req.blobbers)) } @@ -275,10 +274,6 @@ func (req *DownloadRequest) decodeEC(shards [][]byte) (data []byte, err error) { // fillShards will fill `shards` with data from blobbers that belongs to specific // blockNumber and blobber's position index in an allocation func (req *DownloadRequest) fillShards(shards [][][]byte, result *downloadBlock) (err error) { - if len(result.BlockChunks) > len(shards) { - l.Logger.Error("Invalid data received from blobber", result.idx) - return errors.New("invalid_data", "Invalid data received from blobber") - } for i := 0; i < len(result.BlockChunks); i++ { var data []byte if req.encryptedKey != "" {