Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate hash on upload #1217

Merged
merged 28 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3109e16
Merge pull request #1185 from 0chain/sprint-july-3
Kishan-Dhakan Jul 26, 2023
2443294
minor changes to chalk in zus overview
hm90121 Aug 8, 2023
5a4acd6
check start time
Hitenjain14 Aug 15, 2023
df361a6
parse form in parallel
Hitenjain14 Aug 15, 2023
dd2091d
fix err msg
Hitenjain14 Aug 15, 2023
1dd1680
don't parse for delete
Hitenjain14 Aug 15, 2023
7292feb
valid sig fix
Hitenjain14 Aug 15, 2023
85f0988
check form
Hitenjain14 Aug 16, 2023
050f611
Merge pull request #1212 from 0chain/sprint-july-4
Kishan-Dhakan Aug 16, 2023
c2320d9
fixed configs
dabasov Aug 16, 2023
c37441a
Merge pull request #1201 from 0chain/hm90121-patch-1
Kishan-Dhakan Aug 16, 2023
25b3c2d
rmv log
Hitenjain14 Aug 17, 2023
24289ee
change pending check
Hitenjain14 Aug 17, 2023
314dff8
use buf
Hitenjain14 Aug 17, 2023
5cf4cbd
rmv log
Hitenjain14 Aug 17, 2023
d0afe0f
Merge branch 'staging' of https://github.com/0chain/blobber into fix/…
Hitenjain14 Aug 17, 2023
df06603
add vt and mt calc on fly
Hitenjain14 Aug 18, 2023
2db6fe4
fix size calculation
Hitenjain14 Aug 18, 2023
c458849
check hasher
Hitenjain14 Aug 18, 2023
df045f7
merge sprint-1.10
Hitenjain14 Aug 19, 2023
bacc62a
copy from file
Hitenjain14 Aug 19, 2023
23c8fc0
allow read
Hitenjain14 Aug 19, 2023
e46a2c3
fix thumbnail write
Hitenjain14 Aug 19, 2023
09d87b4
fix unit test
Hitenjain14 Aug 21, 2023
34d1513
init hasher
Hitenjain14 Aug 21, 2023
b558d46
Merge branch 'sprint-1.10' of https://github.com/0chain/blobber into …
Hitenjain14 Aug 21, 2023
32cd0af
fix alloc unit test
Hitenjain14 Aug 22, 2023
cc8193e
check alloc root
Hitenjain14 Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
)

var (
Expand All @@ -15,18 +17,23 @@ var (

var (
connectionObjSizeMap = make(map[string]*ConnectionObjSize)
connectionObjMutex sync.Mutex
connectionObjMutex sync.RWMutex
)

type ConnectionObjSize struct {
Size int64
UpdatedAt time.Time
Changes map[string]*ConnectionChanges
}

type ConnectionChanges struct {
Hasher *filestore.CommitHasher
}

// GetConnectionObjSize gets the connection size from the memory
func GetConnectionObjSize(connectionID string) int64 {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObjSize := connectionObjSizeMap[connectionID]
if connectionObjSize == nil {
return 0
Expand All @@ -43,6 +50,7 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
Size: addSize,
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
return
}
Expand All @@ -51,6 +59,34 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSize.UpdatedAt = time.Now()
}

func GetHasher(connectionID, pathHash string) *filestore.CommitHasher {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
return nil
}
if connectionObj.Changes[pathHash] == nil {
return nil
}
return connectionObj.Changes[pathHash].Hasher
}

func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filestore.CommitHasher) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
}
connectionObjSizeMap[connectionID].Changes[pathHash] = &ConnectionChanges{
Hasher: hasher,
}
}

// DeleteConnectionObjEntry remove the connectionID entry from map
// If the given connectionID is not present, then it is no-op.
func DeleteConnectionObjEntry(connectionID string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)

// BaseFileChanger base file change processor
Expand Down Expand Up @@ -92,6 +93,10 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
fileInputData.Hasher = GetHasher(fc.ConnectionID, encryption.Hash(fc.Path))
if fileInputData.Hasher == nil {
return common.NewError("invalid_parameters", "Invalid parameters. Error getting hasher for commit.")
}
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/core/zcncrypto"
Expand Down Expand Up @@ -98,6 +99,9 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ctx := datastore.GetStore().CreateTransaction(context.TODO())

fPath := "/new"
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(fPath)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)
change := &UploadFileChanger{
BaseFileChanger: BaseFileChanger{
Filename: filepath.Base(fPath),
Expand All @@ -107,6 +111,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ValidationRoot: tc.validationRoot,
Size: 2310,
ChunkSize: 65536,
ConnectionID: "connection_id",
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/core/zcncrypto"
"github.com/0chain/gosdk/zboxcore/client"
Expand Down Expand Up @@ -337,6 +338,9 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
tc.setupDbMock()

ctx := datastore.GetStore().CreateTransaction(context.TODO())
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(tc.path)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)

change := &UpdateFileChanger{
BaseFileChanger: BaseFileChanger{
Expand All @@ -352,6 +356,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
ThumbnailSize: 92,
ChunkSize: 65536,
IsFinal: true,
ConnectionID: "connection_id",
},
}

Expand Down
35 changes: 16 additions & 19 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (
)

func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) {
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, encryption.Hash(fileData.Path), conID)
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID)
var initialSize int64
finfo, err := os.Stat(tempFilePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand All @@ -73,7 +73,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
return nil, common.NewError("dir_creation_error", err.Error())
}

f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_WRONLY, 0644)
f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, common.NewError("file_open_error", err.Error())
}
Expand All @@ -88,6 +88,17 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err != nil {
return nil, common.NewError("file_write_error", err.Error())
}
if !fileData.IsThumbnail {
_, err = f.Seek(fileData.UploadOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

_, err = io.CopyBuffer(fileData.Hasher, f, buf)
if err != nil {
return nil, common.NewError("file_read_error", err.Error())
}
}

finfo, err = f.Stat()
if err != nil {
Expand Down Expand Up @@ -243,27 +254,18 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
}

fileSize := rStat.Size()
hasher := GetNewCommitHasher(fileSize)
bufSize := BufferSize
if fileSize < BufferSize {
bufSize = int(fileSize)
}
buffer := make([]byte, bufSize)
_, err = io.CopyBuffer(hasher, r, buffer)
if err != nil {
return false, common.NewError("read_write_error", err.Error())
}

err = hasher.Finalize()
if err != nil {
return false, common.NewError("finalize_error", err.Error())
}
fmtRootBytes, err := hasher.fmt.CalculateRootAndStoreNodes(f)
fmtRootBytes, err := fileData.Hasher.fmt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("fmt_hash_calculation_error", err.Error())
}

validationRootBytes, err := hasher.vt.CalculateRootAndStoreNodes(f)
validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}
Expand All @@ -279,12 +281,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
"calculated validation root does not match with client's validation root")
}

_, err = r.Seek(0, io.SeekStart)
if err != nil {
return false, common.NewError("seek_error", err.Error())
}

_, err = io.Copy(f, r)
_, err = io.CopyBuffer(f, r, buffer)
if err != nil {
return false, common.NewError("write_error", err.Error())
}
Expand Down
6 changes: 4 additions & 2 deletions code/go/0chain.net/blobbercore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ type FileInputData struct {
//Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64
//IsFinal the request is final chunk
IsFinal bool
IsThumbnail bool
IsFinal bool
IsThumbnail bool
FilePathHash string
Hasher *CommitHasher
}

type FileOutputData struct {
Expand Down
31 changes: 22 additions & 9 deletions code/go/0chain.net/blobbercore/filestore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,23 +248,26 @@ func TestStoreStorageWriteAndCommit(t *testing.T) {
size := 640 * KB
validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size))
require.Nil(t, err)

pathHash := encryption.Hash(test.remotePath)
hasher := GetNewCommitHasher(int64(size))
fid := &FileInputData{
Name: test.fileName,
Path: test.remotePath,
ValidationRoot: validationRoot,
FixedMerkleRoot: fixedMerkleRoot,
ChunkSize: 64 * KB,
FilePathHash: pathHash,
Hasher: hasher,
}

f, err := os.Open(fPath)
require.Nil(t, err)
defer f.Close()

_, err = fs.WriteFile(test.allocID, test.connID, fid, f)
require.Nil(t, err)
err = hasher.Finalize()
require.Nil(t, err)

pathHash := encryption.Hash(test.remotePath)
tempFilePath := fs.getTempPathForFile(test.allocID, test.fileName, pathHash, test.connID)
tF, err := os.Stat(tempFilePath)
require.Nil(t, err)
Expand Down Expand Up @@ -329,13 +332,16 @@ func TestDeletePreCommitDir(t *testing.T) {
size := 640 * KB
validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size))
require.Nil(t, err)

pathHash := encryption.Hash(remotePath)
hasher := GetNewCommitHasher(int64(size))
fid := &FileInputData{
Name: fileName,
Path: remotePath,
ValidationRoot: validationRoot,
FixedMerkleRoot: fixedMerkleRoot,
ChunkSize: 64 * KB,
FilePathHash: pathHash,
Hasher: hasher,
}
// checkc if file to be uploaded exists
f, err := os.Open(fPath)
Expand All @@ -344,9 +350,9 @@ func TestDeletePreCommitDir(t *testing.T) {
_, err = fs.WriteFile(allocID, connID, fid, f)
require.Nil(t, err)
f.Close()

err = hasher.Finalize()
require.Nil(t, err)
// check if file is written to temp location
pathHash := encryption.Hash(remotePath)
tempFilePath := fs.getTempPathForFile(allocID, fileName, pathHash, connID)
tF, err := os.Stat(tempFilePath)
require.Nil(t, err)
Expand All @@ -368,6 +374,8 @@ func TestDeletePreCommitDir(t *testing.T) {

fid.ValidationRoot = validationRoot
fid.FixedMerkleRoot = fixedMerkleRoot
hasher = GetNewCommitHasher(int64(size))
fid.Hasher = hasher

// Write file to temp location
f, err = os.Open(fPath)
Expand All @@ -379,6 +387,8 @@ func TestDeletePreCommitDir(t *testing.T) {
tempFilePath = fs.getTempPathForFile(allocID, fileName, pathHash, connID)
_, err = os.Stat(tempFilePath)
require.Nil(t, err)
err = hasher.Finalize()
require.Nil(t, err)

success, err = fs.CommitWrite(allocID, connID, fid)
require.Nil(t, err)
Expand Down Expand Up @@ -422,13 +432,16 @@ func TestStorageUploadUpdate(t *testing.T) {
size := 640 * KB
validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size))
require.Nil(t, err)

pathHash := encryption.Hash(remotePath)
hasher := GetNewCommitHasher(int64(size))
fid := &FileInputData{
Name: fileName,
Path: remotePath,
ValidationRoot: validationRoot,
FixedMerkleRoot: fixedMerkleRoot,
ChunkSize: 64 * KB,
FilePathHash: pathHash,
Hasher: hasher,
}
// checkc if file to be uploaded exists
f, err := os.Open(fPath)
Expand All @@ -437,9 +450,9 @@ func TestStorageUploadUpdate(t *testing.T) {
_, err = fs.WriteFile(allocID, connID, fid, f)
require.Nil(t, err)
f.Close()

err = hasher.Finalize()
require.Nil(t, err)
// check if file is written to temp location
pathHash := encryption.Hash(remotePath)
tempFilePath := fs.getTempPathForFile(allocID, fileName, pathHash, connID)
tF, err := os.Stat(tempFilePath)
require.Nil(t, err)
Expand Down
Loading
Loading