diff --git a/wasmsdk/jsbridge/file_writer.go b/wasmsdk/jsbridge/file_writer.go index 5fe081a87..e265117fa 100644 --- a/wasmsdk/jsbridge/file_writer.go +++ b/wasmsdk/jsbridge/file_writer.go @@ -5,7 +5,6 @@ package jsbridge import ( "errors" - "io" "io/fs" "syscall/js" @@ -34,29 +33,42 @@ func (w *FileWriter) Write(p []byte) (int, error) { //copy bytes to buf if w.bufWriteOffset+len(p) > len(w.buf) { - w.writeError = true - return 0, io.ErrShortWrite + err := w.flush() + if err != nil { + return 0, err + } } n := copy(w.buf[w.bufWriteOffset:], p) w.bufWriteOffset += n if w.bufWriteOffset == len(w.buf) { //write to file - if w.bufLen != len(w.buf) { - w.bufLen = len(w.buf) - w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen) + err := w.flush() + if err != nil { + return 0, err } - js.CopyBytesToJS(w.uint8Array, w.buf) - _, err := Await(w.writableStream.Call("write", w.uint8Array)) - if len(err) > 0 && !err[0].IsNull() { - w.writeError = true - return 0, errors.New("file_writer: " + err[0].String()) - } - //reset buffer - w.bufWriteOffset = 0 } return len(p), nil } +func (w *FileWriter) flush() error { + if w.bufWriteOffset == 0 { + return nil + } + if w.bufLen != w.bufWriteOffset { + w.bufLen = w.bufWriteOffset + w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen) + } + js.CopyBytesToJS(w.uint8Array, w.buf[:w.bufWriteOffset]) + _, err := Await(w.writableStream.Call("write", w.uint8Array)) + if len(err) > 0 && !err[0].IsNull() { + w.writeError = true + return errors.New("file_writer: " + err[0].String()) + } + //reset buffer + w.bufWriteOffset = 0 + return nil +} + // func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) { // uint8Array := js.Global().Get("Uint8Array").New(len(p)) // js.CopyBytesToJS(uint8Array, p) diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index f759bc726..1408fa539 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -711,14 +711,14 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { if strings.Contains(err.Error(), "duplicate") { su.consensus.Done() errC := atomic.AddInt32(&su.addConsensus, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- err } return } logger.Logger.Error("error during sendUploadRequest", err, " connectionID: ", su.progress.ConnectionID) errC := atomic.AddInt32(&errCount, 1) - if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later wgErrors <- err } } diff --git a/zboxcore/sdk/chunked_upload_process_js.go b/zboxcore/sdk/chunked_upload_process_js.go index a88c595e5..5d2027160 100644 --- a/zboxcore/sdk/chunked_upload_process_js.go +++ b/zboxcore/sdk/chunked_upload_process_js.go @@ -272,7 +272,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er eventChan := allEventChan[pos] if eventChan.C == nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- thrown.New("upload_failed", "Upload failed. Worker event channel not found") } return @@ -282,7 +282,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er if !ok { logger.Logger.Error("chan closed from: ", blobber.blobber.Baseurl) errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { if su.ctx.Err() != nil { wgErrors <- context.Cause(su.ctx) } else { @@ -294,7 +294,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er msgType, data, err := jsbridge.GetMsgType(event) if err != nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- errors.Wrap(err, "could not get msgType") } return @@ -304,7 +304,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er case "auth": if err := su.processWebWorkerAuthRequest(data, eventChan); err != nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- err } return @@ -316,7 +316,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er isFinal, err = su.processWebWorkerUpload(data, blobber, pos) if err != nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- err } } else {