Skip to content

Commit

Permalink
Merge branch 'sprint-july-4' into case-3-and-case-4
Browse files Browse the repository at this point in the history
  • Loading branch information
devyetii authored Aug 17, 2023
2 parents 58d406f + cb1f9c2 commit 50cb16f
Show file tree
Hide file tree
Showing 18 changed files with 146 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (nf *UploadFileChanger) ApplyChange(ctx context.Context, rootRef *reference
found := false
for _, child := range dirRef.Children {
if child.Name == fields[i] {
if child.Type != reference.DIRECTORY {
return nil, common.NewError("invalid_reference_path", "Reference path has invalid ref type")
}
dirRef = child
dirRef.UpdatedAt = ts
dirRef.HashToBeComputed = true
Expand Down
9 changes: 7 additions & 2 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
PreCommitDir = "precommit"
MerkleChunkSize = 64
ChunkSize = 64 * KB
BufferSize = 10 * MB
)

func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) {
Expand Down Expand Up @@ -243,7 +244,12 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)

fileSize := rStat.Size()
hasher := GetNewCommitHasher(fileSize)
_, err = io.Copy(hasher, r)
bufSize := BufferSize
if fileSize < BufferSize {
bufSize = int(fileSize)
}
buffer := make([]byte, bufSize)
_, err = io.CopyBuffer(hasher, r, buffer)
if err != nil {
return false, common.NewError("read_write_error", err.Error())
}
Expand All @@ -261,7 +267,6 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}

fmtRoot := hex.EncodeToString(fmtRootBytes)
validationRoot := hex.EncodeToString(validationRootBytes)

Expand Down
59 changes: 52 additions & 7 deletions code/go/0chain.net/blobbercore/filestore/tree_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"math"
"sync"

"github.com/0chain/gosdk/core/util"
"golang.org/x/crypto/sha3"
Expand Down Expand Up @@ -402,30 +403,74 @@ func getNewValidationTree(dataSize int64) *validationTree {
type commitHasher struct {
fmt *fixedMerkleTree
vt *validationTree
writer io.Writer
isInitialized bool
}

func GetNewCommitHasher(dataSize int64) *commitHasher {
c := new(commitHasher)
c.fmt = getNewFixedMerkleTree()
c.vt = getNewValidationTree(dataSize)
c.writer = io.MultiWriter(c.fmt, c.vt)
c.isInitialized = true
return c
}

func (c *commitHasher) Write(b []byte) (int, error) {
return c.writer.Write(b)
var (
wg sync.WaitGroup
errChan = make(chan error, 2)
n int
)
wg.Add(2)
go func() {
_, err := c.fmt.Write(b)
if err != nil {
errChan <- err
}

wg.Done()
}()
go func() {
written, err := c.vt.Write(b)
if err != nil {
errChan <- err
}
n = written
wg.Done()
}()
wg.Wait()
close(errChan)
for err := range errChan {
return n, err
}
return n, nil
}

func (c *commitHasher) Finalize() error {
err := c.fmt.Finalize()
if err != nil {
var (
wg sync.WaitGroup
errChan = make(chan error, 2)
)
wg.Add(2)
go func() {
err := c.fmt.Finalize()
if err != nil {
errChan <- err
}
wg.Done()
}()
go func() {
err := c.vt.Finalize()
if err != nil {
errChan <- err
}
wg.Done()
}()
wg.Wait()
close(errChan)
for err := range errChan {
return err
}

return c.vt.Finalize()
return nil
}

func (c *commitHasher) GetFixedMerkleRoot() string {
Expand Down
5 changes: 0 additions & 5 deletions code/go/0chain.net/blobbercore/handler/file_command_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request
return common.NewError("invalid_operation", "Operation needs to be performed by the owner or the payer of the allocation")
}

if err := validateParentPathType(ctx, allocationObj.ID, fileChanger.Path); err != nil {
return err
}

_, thumbHeader, _ := req.FormFile(UploadThumbnailFile)
if thumbHeader != nil {
if thumbHeader.Size > MaxThumbnailSize {
Expand All @@ -100,7 +96,6 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request
}

cmd.fileChanger = fileChanger

return nil
}

Expand Down
1 change: 0 additions & 1 deletion code/go/0chain.net/blobbercore/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ func WithConnection(handler common.JSONResponderF) common.JSONResponderF {

return err
})

return resp, err
}
}
Expand Down
20 changes: 10 additions & 10 deletions code/go/0chain.net/blobbercore/handler/handler_download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestHandlers_Download(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, filePathHash).WillReturnError(gorm.ErrRecordNotFound)
WithArgs(filePathHash).WillReturnError(gorm.ErrRecordNotFound)

},
wantCode: http.StatusBadRequest,
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestHandlers_Download(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "lookup_hash", "validation_root"}).
AddRow("/file.txt", "f", filePathHash, "abcd"),
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestHandlers_Download(t *testing.T) {
filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")

mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "chunk_size"}).
AddRow("/file.txt", "f", filePathHash, filePathHash, "validation_root", "qCj3sXXeXUAByi1ERIbcfXzWN75dyocYzyRXnkStXio=", 65536),
Expand Down Expand Up @@ -612,7 +612,7 @@ func TestHandlers_Download(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "chunk_size"}).
AddRow("/file.txt", "f", filePathHash, filePathHash, "validation_root", ownerScheme.GetEncryptedKey(), 65536),
Expand Down Expand Up @@ -726,15 +726,15 @@ func TestHandlers_Download(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "parent_path", "chunk_size"}).
AddRow("/file.txt", "f", filePathHash, filePathHash, "validation_root", ownerScheme.GetEncryptedKey(), "/", fileref.CHUNK_SIZE),
)

rootPathHash := fileref.GetReferenceLookup(alloc.Tx, "/")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","path" FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, rootPathHash).
WithArgs(rootPathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "parent_path"}).
AddRow("/", "d", rootPathHash, rootPathHash, "validation_root", "", "."),
Expand Down Expand Up @@ -846,15 +846,15 @@ func TestHandlers_Download(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/folder1/subfolder1/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "parent_path", "chunk_size"}).
AddRow("/folder1/subfolder1/file.txt", "f", filePathHash, filePathHash, "validation_root", ownerScheme.GetEncryptedKey(), "/folder1/subfolder1", filestore.CHUNK_SIZE),
)

rootPathHash := fileref.GetReferenceLookup(alloc.Tx, "/folder1")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","path" FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, rootPathHash).
WithArgs(rootPathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "parent_path"}).
AddRow("/folder1", "d", rootPathHash, rootPathHash, "validation_root", "", "."),
Expand Down Expand Up @@ -965,15 +965,15 @@ func TestHandlers_Download(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/folder2/subfolder1/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "parent_path", "chunk_size"}).
AddRow("/file.txt", "f", filePathHash, filePathHash, "validation_root", ownerScheme.GetEncryptedKey(), "/folder2/subfolder1", fileref.CHUNK_SIZE),
)

rootPathHash := fileref.GetReferenceLookup(alloc.Tx, "/folder1")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","path" FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, rootPathHash).
WithArgs(rootPathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "type", "path_hash", "lookup_hash", "validation_root", "encrypted_key", "parent_path"}).
AddRow("/folder1", "d", rootPathHash, rootPathHash, "validation_root", "", "/"),
Expand Down
8 changes: 4 additions & 4 deletions code/go/0chain.net/blobbercore/handler/handler_share_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestHandlers_Share(t *testing.T) {
)

mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","path","lookup_hash","type","name" FROM "reference_objects" WHERE`)).
WithArgs(alloc.Tx, "f15383a1130bd2fae1e52a7a15c432269eeb7def555f1f8b9b9a28bd9611362c").
WithArgs("f15383a1130bd2fae1e52a7a15c432269eeb7def555f1f8b9b9a28bd9611362c").
WillReturnRows(
sqlmock.NewRows([]string{"path", "lookup_hash"}).
AddRow("/file.txt", "f15383a1130bd2fae1e52a7a15c432269eeb7def555f1f8b9b9a28bd9611362c"),
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestHandlers_Share(t *testing.T) {
)

mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","path","lookup_hash","type","name" FROM "reference_objects" WHERE`)).
WithArgs(alloc.Tx, "f15383a1130bd2fae1e52a7a15c432269eeb7def555f1f8b9b9a28bd9611362c").
WithArgs("f15383a1130bd2fae1e52a7a15c432269eeb7def555f1f8b9b9a28bd9611362c").
WillReturnRows(
sqlmock.NewRows([]string{"path", "lookup_hash"}).
AddRow("/file.txt", "f15383a1130bd2fae1e52a7a15c432269eeb7def555f1f8b9b9a28bd9611362c"),
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestHandlers_Share(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","type" FROM "reference_objects" WHERE`)).
WithArgs(alloc.Tx, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "lookup_hash"}).
AddRow("/file.txt", filePathHash),
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestHandlers_Share(t *testing.T) {

filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")
mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","type" FROM "reference_objects" WHERE`)).
WithArgs(alloc.Tx, filePathHash).
WithArgs(filePathHash).
WillReturnRows(
sqlmock.NewRows([]string{"path", "lookup_hash"}).
AddRow("/file.txt", filePathHash),
Expand Down
22 changes: 11 additions & 11 deletions code/go/0chain.net/blobbercore/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ func TestHandlers_Requiring_Signature(t *testing.T) {
sqlmock.NewRows([]string{"id", "allocation_id"}).
AddRow(alloc.Terms[0].ID, alloc.Terms[0].AllocationID),
)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT count(*) FROM "reference_objects"`)).
WithArgs(aa, aa).
mock.ExpectQuery(regexp.QuoteMeta(`SELECT EXISTS(SELECT 1 FROM reference_objects WHERE lookup_hash=$1 AND deleted_at is NULL) AS found`)).
WithArgs(aa).
WillReturnRows(
sqlmock.NewRows([]string{"count"}).
AddRow(0),
sqlmock.NewRows([]string{"found"}).
AddRow(false),
)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "allocation_connections" WHERE`)).
WithArgs(connectionID, alloc.ID, alloc.OwnerID, allocation.DeletedConnection).
Expand Down Expand Up @@ -560,7 +560,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) {

lookUpHash := reference.GetReferenceLookup(alloc.ID, path)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, lookUpHash).
WithArgs(lookUpHash).
WillReturnRows(
sqlmock.NewRows([]string{"type"}).
AddRow(reference.FILE),
Expand Down Expand Up @@ -641,7 +641,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) {

lookUpHash := reference.GetReferenceLookup(alloc.ID, path)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","name","path","hash","size","validation_root","fixed_merkle_root" FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, lookUpHash).
WithArgs(lookUpHash).
WillReturnRows(
sqlmock.NewRows([]string{"type"}).
AddRow(reference.FILE),
Expand Down Expand Up @@ -724,7 +724,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) {

lookUpHash := reference.GetReferenceLookup(alloc.ID, path)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT "id","name","path","hash","size","validation_root","fixed_merkle_root" FROM "reference_objects" WHERE`)).
WithArgs(alloc.ID, lookUpHash).
WithArgs(lookUpHash).
WillReturnRows(
sqlmock.NewRows([]string{"type", "name"}).
AddRow(reference.FILE, "path"),
Expand Down Expand Up @@ -833,11 +833,11 @@ func TestHandlers_Requiring_Signature(t *testing.T) {
sqlmock.NewRows([]string{"id", "allocation_id"}).
AddRow(alloc.Terms[0].ID, alloc.Terms[0].AllocationID),
)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT count(*) FROM "reference_objects"`)).
WithArgs(aa, aa).
mock.ExpectQuery(regexp.QuoteMeta(`SELECT EXISTS(SELECT 1 FROM reference_objects WHERE lookup_hash=$1 AND deleted_at is NULL) AS found`)).
WithArgs(aa).
WillReturnRows(
sqlmock.NewRows([]string{"count"}).
AddRow(0),
sqlmock.NewRows([]string{"found"}).
AddRow(false),
)
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "allocation_connections" WHERE`)).
WithArgs(connectionID, alloc.ID, alloc.OwnerID, allocation.DeletedConnection).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,12 +660,16 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewError("allocation_write_error", "Error persisting the allocation object")
}

elapsedSaveAllocation := time.Since(startTime) - elapsedAllocation - elapsedGetLock -
elapsedGetConnObj - elapsedVerifyWM - elapsedWritePreRedeem - elapsedApplyChanges

err = connectionObj.CommitToFileStore(ctx)
if err != nil {
if !errors.Is(common.ErrFileWasDeleted, err) {
return nil, common.NewError("file_store_error", "Error committing to file store. "+err.Error())
}
}
elapsedCommitStore := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedGetConnObj - elapsedVerifyWM - elapsedWritePreRedeem - elapsedApplyChanges - elapsedSaveAllocation
err = writemarkerEntity.SendToChan(ctx)
if err != nil {
return nil, common.NewError("write_marker_error", "Error redeeming the write marker")
Expand All @@ -684,9 +688,6 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
input := connectionObj.Changes[0].Input

//Delete connection object and its changes
for _, c := range connectionObj.Changes {
db.Delete(c)
}

db.Delete(connectionObj)
go allocation.DeleteConnectionObjEntry(connectionID)
Expand All @@ -702,6 +703,8 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
zap.Duration("write-pre-redeem", elapsedWritePreRedeem),
zap.Duration("move-to-filestore", elapsedMoveToFilestore),
zap.Duration("apply-changes", elapsedApplyChanges),
zap.Duration("save-allocation", elapsedSaveAllocation),
zap.Duration("commit-store", elapsedCommitStore),
zap.Duration("total", time.Since(startTime)),
)
return &result, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestDownloadFile(t *testing.T) {
mocket.Catcher.NewMock().OneTime().WithQuery(
`SELECT * FROM "reference_objects" WHERE`,
).WithArgs(
"mock_allocation_id", p.inData.pathHash,
p.inData.pathHash,
).WithReply(
[]map[string]interface{}{{
"allocation_id": p.allocation.ID,
Expand All @@ -244,7 +244,7 @@ func TestDownloadFile(t *testing.T) {
require.EqualValues(t, p.payerId.ClientID, args[0].Value)
require.EqualValues(t, mockAllocationId, args[1].Value)
require.EqualValues(t, mockBlobberId, args[2].Value)
}).WithArgs(mockAllocationId).WithReply(
}).WithReply(
[]map[string]interface{}{{
"path": "",
"client_id": p.payerId.ClientID,
Expand Down
Loading

0 comments on commit 50cb16f

Please sign in to comment.