Skip to content

Commit

Permalink
add more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Aug 10, 2023
1 parent fbd15bf commit ddd6f9b
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package allocation

import (
"context"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"go.uber.org/zap"
)

// BaseFileChanger base file change processor
Expand Down Expand Up @@ -92,9 +95,12 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
start := time.Now()
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
}
elapsed := time.Since(start)
logging.Logger.Info("CommitToFileStore", zap.String("path", fc.Path), zap.Duration("elapsed", elapsed))
return nil
}
55 changes: 34 additions & 21 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"strings"
"sync"
"syscall"
"time"

"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
Expand Down Expand Up @@ -180,14 +181,14 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
return false, common.NewError("file_create_error", err.Error())
}

r, err := os.OpenFile(tempFilePath, os.O_RDWR|os.O_APPEND, 0644)
r, err := os.Open(tempFilePath)
if err != nil {
f.Close()
return false, err
}

defer f.Close()

defer func() {
f.Close()
r.Close()
if err != nil {
os.Remove(preCommitPath)
Expand All @@ -214,7 +215,12 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
fileData.ThumbnailHash, hash))
}

err = os.Rename(tempFilePath, preCommitPath)
_, err = r.Seek(0, io.SeekStart)
if err != nil {
return false, err
}

_, err = io.Copy(f, r)
if err != nil {
return false, err
}
Expand All @@ -237,6 +243,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
}

fileSize := rStat.Size()
start := time.Now()
hasher := GetNewCommitHasher(fileSize)
_, err = io.Copy(hasher, r)
if err != nil {
Expand All @@ -247,16 +254,18 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
if err != nil {
return false, common.NewError("finalize_error", err.Error())
}
fmtRootBytes, err := hasher.fmt.CalculateRootAndStoreNodes(r)
elapsedHash := time.Since(start)
fmtRootBytes, err := hasher.fmt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("fmt_hash_calculation_error", err.Error())
}
elapsedMerkle := time.Since(start) - elapsedHash

validationRootBytes, err := hasher.vt.CalculateRootAndStoreNodes(r)
validationRootBytes, err := hasher.vt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}

elapsedValidation := time.Since(start) - elapsedMerkle - elapsedHash
fmtRoot := hex.EncodeToString(fmtRootBytes)
validationRoot := hex.EncodeToString(validationRootBytes)

Expand All @@ -269,7 +278,12 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
"calculated validation root does not match with client's validation root")
}

err = os.Rename(tempFilePath, preCommitPath)
_, err = r.Seek(0, io.SeekStart)
if err != nil {
return false, common.NewError("seek_error", err.Error())
}

_, err = io.Copy(f, r)
if err != nil {
return false, common.NewError("write_error", err.Error())
}
Expand All @@ -285,6 +299,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
// 5. Move: It is Copy + Delete. Delete will not delete file if ref exists in database. i.e. copy would create
// ref that refers to this file therefore it will be skipped
fs.incrDecrAllocFileSizeAndNumber(allocID, fileSize, 1)
logging.Logger.Info("CommitWrite", zap.Duration("elapsedHash", elapsedHash), zap.Duration("elapsedMerkle", elapsedMerkle), zap.Duration("elapsedValidation", elapsedValidation), zap.Duration("elapsedTotal", time.Since(start)))
return true, nil
}

Expand Down Expand Up @@ -520,22 +535,23 @@ func (fs *FileStore) GetFileBlock(readBlockIn *ReadBlockInput) (*FileDownloadRes
vp := validationTreeProof{
dataSize: readBlockIn.FileSize,
}
_, err = file.Seek(-FMTSize, io.SeekEnd)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}

nodes, indexes, err := vp.GetMerkleProofOfMultipleIndexes(file, nodesSize, startBlock, endBlock)
if err != nil {
return nil, common.NewError("get_merkle_proof_error", err.Error())
}

vmp.Nodes = nodes
vmp.Indexes = indexes
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
}

fileOffset := FMTSize + nodesSize + int64(startBlock)*ChunkSize

_, err = file.Seek(fileOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}

buffer := make([]byte, readBlockIn.NumBlocks*ChunkSize)
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
Expand Down Expand Up @@ -586,18 +602,15 @@ func (fs *FileStore) GetBlocksMerkleTreeForChallenge(in *ChallengeReadBlockInput
dataSize: in.FileSize,
}

_, err = file.Seek(in.FileSize, io.SeekStart)
_, err = file.Seek(-in.FileSize, io.SeekEnd)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}
merkleProof, err := fmp.GetMerkleProof(file)
if err != nil {
return nil, common.NewError("get_merkle_proof_error", err.Error())
}
_, err = file.Seek(0, io.SeekStart)
if err != nil {
return nil, common.NewError("seek_error", err.Error())
}

proofByte, err := fmp.GetLeafContent(file)
if err != nil {
return nil, common.NewError("get_leaf_content_error", err.Error())
Expand Down
29 changes: 9 additions & 20 deletions code/go/0chain.net/blobbercore/filestore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,6 @@ func TestGetMerkleTree(t *testing.T) {
finfo, _ := f.Stat()
fmt.Println("Size: ", finfo.Size())
mr, err := getFixedMerkleRoot(f, int64(size))
require.Equal(t, fixedMerkleRoot, mr)
require.Nil(t, err)
t.Logf("Merkle root: %s", mr)
allocID := randString(64)
Expand Down Expand Up @@ -908,15 +907,12 @@ func generateRandomDataAndStoreNodes(fPath string, size int64) (string, string,
}
defer f.Close()

_, err = f.Write(p)
if err != nil {
return "", "", err
}
cH := GetNewCommitHasher(size)
_, err = cH.Write(p)
if err != nil {
return "", "", err
}

err = cH.Finalize()
if err != nil {
return "", "", err
Expand All @@ -932,26 +928,26 @@ func generateRandomDataAndStoreNodes(fPath string, size int64) (string, string,
return "", "", err
}

_, err = f.Write(p)
if err != nil {
return "", "", err
}

return hex.EncodeToString(validationMerkleRoot), hex.EncodeToString(fixedMerkleRoot), nil
}

func getFixedMerkleRoot(r io.ReadSeeker, dataSize int64) (mr string, err error) {
_, err = r.Seek(0, io.SeekStart)
_, err = r.Seek(-dataSize, io.SeekEnd)
if err != nil {
return
}

fixedMT := util.NewFixedMerkleTree()
var count int
var dataRead int
mainloop:
for {
dataLeft := dataSize - int64(dataRead)
toRead := 64 * KB
if dataLeft < 64*KB {
toRead = int(dataLeft)
}
b := make([]byte, toRead)

b := make([]byte, 64*KB)
var n int
n, err = r.Read(b)
if err != nil {
Expand All @@ -963,13 +959,6 @@ mainloop:
}
return
}
dataRead += n
if toRead < 64*KB {
if n == 0 {
break
}
goto final
}
if n != 64*KB {
fmt.Println("n is ", n)
return "", errors.New("invalid byte length. Must be 64 KB")
Expand Down
7 changes: 5 additions & 2 deletions code/go/0chain.net/blobbercore/handler/file_command_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"path/filepath"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"

Expand Down Expand Up @@ -45,6 +46,7 @@ func (cmd *UploadFileCommand) GetPath() string {

// IsValidated validate request.
func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, clientID string) error {
start := time.Now()
if allocationObj.OwnerID != clientID && allocationObj.RepairerID != clientID {
return common.NewError("invalid_operation", "Operation needs to be performed by the owner or the payer of the allocation")
}
Expand All @@ -57,6 +59,7 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request
return common.NewError("invalid_parameters",
"Invalid parameters. Error parsing the meta data for upload."+err.Error())
}
elapsedUnmarshal := time.Since(start)

if fileChanger.Path == "/" {
return common.NewError("invalid_path", "Invalid path. Cannot upload to root directory")
Expand All @@ -72,7 +75,7 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request
logging.Logger.Error(err.Error())
return common.NewError("database_error", "Got db error while getting ref")
}

elapsedRefExist := time.Since(start) - elapsedUnmarshal
if isExist {
msg := fmt.Sprintf("File at path :%s: already exists", fileChanger.Path)
return common.NewError("duplicate_file", msg)
Expand All @@ -96,7 +99,7 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request
}

cmd.fileChanger = fileChanger

logging.Logger.Info("isValidated", zap.Duration("elapsedUnmarshal", elapsedUnmarshal), zap.Duration("elapsedRefExist", elapsedRefExist), zap.Duration("elapsed", time.Since(start)))
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,12 +660,16 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
return nil, common.NewError("allocation_write_error", "Error persisting the allocation object")
}

elapsedSaveAllocation := time.Since(startTime) - elapsedAllocation - elapsedGetLock -
elapsedGetConnObj - elapsedVerifyWM - elapsedWritePreRedeem - elapsedApplyChanges

err = connectionObj.CommitToFileStore(ctx)
if err != nil {
if !errors.Is(common.ErrFileWasDeleted, err) {
return nil, common.NewError("file_store_error", "Error committing to file store. "+err.Error())
}
}
elapsedCommitStore := time.Since(startTime) - elapsedAllocation - elapsedGetLock - elapsedGetConnObj - elapsedVerifyWM - elapsedWritePreRedeem - elapsedApplyChanges - elapsedSaveAllocation
err = writemarkerEntity.SendToChan(ctx)
if err != nil {
return nil, common.NewError("write_marker_error", "Error redeeming the write marker")
Expand All @@ -684,9 +688,6 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
input := connectionObj.Changes[0].Input

//Delete connection object and its changes
for _, c := range connectionObj.Changes {
db.Delete(c)
}

db.Delete(connectionObj)
go allocation.DeleteConnectionObjEntry(connectionID)
Expand All @@ -702,6 +703,8 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b
zap.Duration("write-pre-redeem", elapsedWritePreRedeem),
zap.Duration("move-to-filestore", elapsedMoveToFilestore),
zap.Duration("apply-changes", elapsedApplyChanges),
zap.Duration("save-allocation", elapsedSaveAllocation),
zap.Duration("commit-store", elapsedCommitStore),
zap.Duration("total", time.Since(startTime)),
)
return &result, nil
Expand Down
2 changes: 1 addition & 1 deletion goose/migrations/001_blobber_meta.sql
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ CREATE INDEX path_idx ON public.reference_objects USING btree (path);
--

ALTER TABLE ONLY public.allocation_changes
ADD CONSTRAINT fk_allocation_connections_changes FOREIGN KEY (connection_id) REFERENCES public.allocation_connections(id);
ADD CONSTRAINT fk_allocation_connections_changes FOREIGN KEY (connection_id) REFERENCES public.allocation_connections(id) ON DELETE CASCADE;


--
Expand Down

0 comments on commit ddd6f9b

Please sign in to comment.