Skip to content

Commit

Permalink
Merge pull request #1691 from 0chain/hotfix/short-write
Browse files Browse the repository at this point in the history
Flush data
  • Loading branch information
dabasov authored Nov 25, 2024
2 parents d4c44a6 + 0a4662a commit 2e760c7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 21 deletions.
40 changes: 26 additions & 14 deletions wasmsdk/jsbridge/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package jsbridge

import (
"errors"
"io"
"io/fs"
"syscall/js"

Expand Down Expand Up @@ -34,29 +33,42 @@ func (w *FileWriter) Write(p []byte) (int, error) {

//copy bytes to buf
if w.bufWriteOffset+len(p) > len(w.buf) {
w.writeError = true
return 0, io.ErrShortWrite
err := w.flush()
if err != nil {
return 0, err
}
}
n := copy(w.buf[w.bufWriteOffset:], p)
w.bufWriteOffset += n
if w.bufWriteOffset == len(w.buf) {
//write to file
if w.bufLen != len(w.buf) {
w.bufLen = len(w.buf)
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
err := w.flush()
if err != nil {
return 0, err
}
js.CopyBytesToJS(w.uint8Array, w.buf)
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
w.writeError = true
return 0, errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
}
return len(p), nil
}

func (w *FileWriter) flush() error {
if w.bufWriteOffset == 0 {
return nil
}
if w.bufLen != w.bufWriteOffset {
w.bufLen = w.bufWriteOffset
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
}
js.CopyBytesToJS(w.uint8Array, w.buf[:w.bufWriteOffset])
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
w.writeError = true
return errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
return nil
}

// func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) {
// uint8Array := js.Global().Get("Uint8Array").New(len(p))
// js.CopyBytesToJS(uint8Array, p)
Expand Down
4 changes: 2 additions & 2 deletions zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,14 +711,14 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error {
if strings.Contains(err.Error(), "duplicate") {
su.consensus.Done()
errC := atomic.AddInt32(&su.addConsensus, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
return
}
logger.Logger.Error("error during sendUploadRequest", err, " connectionID: ", su.progress.ConnectionID)
errC := atomic.AddInt32(&errCount, 1)
if errC > int32(su.allocationObj.ParityShards-1) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { // If atleast data shards + 1 number of blobbers can process the upload, it can be repaired later
wgErrors <- err
}
}
Expand Down
10 changes: 5 additions & 5 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
eventChan := allEventChan[pos]
if eventChan.C == nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- thrown.New("upload_failed", "Upload failed. Worker event channel not found")
}
return
Expand All @@ -282,7 +282,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
if !ok {
logger.Logger.Error("chan closed from: ", blobber.blobber.Baseurl)
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
if su.ctx.Err() != nil {
wgErrors <- context.Cause(su.ctx)
} else {
Expand All @@ -294,7 +294,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
msgType, data, err := jsbridge.GetMsgType(event)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- errors.Wrap(err, "could not get msgType")
}
return
Expand All @@ -304,7 +304,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
case "auth":
if err := su.processWebWorkerAuthRequest(data, eventChan); err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
return
Expand All @@ -316,7 +316,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
isFinal, err = su.processWebWorkerUpload(data, blobber, pos)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
} else {
Expand Down

0 comments on commit 2e760c7

Please sign in to comment.