Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add/case 2 and case 4 #1196

Closed
wants to merge 50 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
1c11c89
Added challenge-timings-by-challengeId endpoint
Jayashsatolia403 May 31, 2023
bc2a083
Merge branch 'staging' into feature/get_challenge_timings_by_challeng…
Jayashsatolia403 May 31, 2023
4915414
Merge branch 'staging' into feature/get_challenge_timings_by_challeng…
Jayashsatolia403 Jun 3, 2023
98b7079
Merge branch 'staging' into feature/get_challenge_timings_by_challeng…
dabasov Jun 12, 2023
b23abb8
Merge branch 'staging' into feature/get_challenge_timings_by_challeng…
Jayashsatolia403 Jun 18, 2023
9b3d3d9
Removed print statement
Jayashsatolia403 Jun 18, 2023
197e0bb
Merge branch 'staging' into feature/get_challenge_timings_by_challeng…
Jayashsatolia403 Jun 21, 2023
2b93071
update challenge timing submission
Hitenjain14 Jun 22, 2023
8704965
Merge remote-tracking branch 'origin/fix/challenge-timing' into featu…
Jayashsatolia403 Jun 22, 2023
c724329
False commit
Jayashsatolia403 Jun 22, 2023
4068d40
added false commit swagger.yaml
shahnawaz-creator Jun 22, 2023
40b697e
fix createdAt in challenge timing table
Hitenjain14 Jun 25, 2023
3527681
Merge remote-tracking branch 'origin/fix/challenge-timing' into featu…
Jayashsatolia403 Jun 25, 2023
7b67bf0
Merge remote-tracking branch 'origin/feature/get_challenge_timings_by…
Jayashsatolia403 Jun 25, 2023
e08f2f5
Add path in thumbnail hash (#1098)
Hitenjain14 Jul 4, 2023
3a3dfd5
Dep/update (#1153)
dabasov Jul 4, 2023
df08ba8
Merge branch 'staging' into sprint-july-1
Kishan-Dhakan Jul 4, 2023
fe1ac2f
Remove fileID from fileMetaHash (#1114)
Hitenjain14 Jul 4, 2023
365477c
optimize image (#1148)
Manali-Jain-squareops Jul 6, 2023
bdcd4f2
Merge branch 'sprint-july-1' into feature/get_challenge_timings_by_ch…
Jayashsatolia403 Jul 6, 2023
dd85148
once for logger init
dabasov Jul 4, 2023
240399d
Update challenge timing submission (#1140)
Hitenjain14 Jul 6, 2023
51bc27a
Merge branch 'sprint-july-1' into feature/get_challenge_timings_by_ch…
Jayashsatolia403 Jul 6, 2023
854f859
Fix blobber size (#1163)
Jayashsatolia403 Jul 8, 2023
c8de907
Send WM commit status to conductor
lpoli Jul 11, 2023
8e99dbf
Rename file
lpoli Jul 12, 2023
5348065
Merge branch 'staging' into add/conductor-tests-for-blobber-and-valid…
lpoli Jul 16, 2023
815572a
Merge branch 'sprint-july-1' into feature/get_challenge_timings_by_ch…
boddumanohar Jul 23, 2023
71aaa87
Modify log type
lpoli Jul 25, 2023
8e765d0
Add state for stopping blobber from committing WM
lpoli Jul 29, 2023
fcf9d04
Cache Allocation to reduce DB load (#1146)
Jayashsatolia403 Jul 29, 2023
a71c6b5
use repo save method
Hitenjain14 Jul 30, 2023
86a8d46
use fresh context for new txn
Hitenjain14 Jul 30, 2023
d1f9a55
rollback changes
Hitenjain14 Jul 30, 2023
c660007
cleanup
Hitenjain14 Jul 31, 2023
dda6e7c
Merge pull request #1190 from 0chain/fix/save-cache
Kishan-Dhakan Jul 31, 2023
6452513
Merge remote-tracking branch 'origin/sprint-july-4' into feature/get_…
Jayashsatolia403 Jul 31, 2023
c86175c
Fix lint
Jayashsatolia403 Jul 31, 2023
c5b0ca5
Removed unnecessary file changes
Jayashsatolia403 Jul 31, 2023
4dac6aa
Merge pull request #1107 from 0chain/feature/get_challenge_timings_by…
Kishan-Dhakan Jul 31, 2023
831c416
check if challenge exists (#1188)
Hitenjain14 Jul 31, 2023
bce9c45
Send file meta root to rpc server
lpoli Aug 3, 2023
c5a3058
Export function to use it in conductor client
lpoli Aug 4, 2023
8a6aeed
Modify field type
lpoli Aug 4, 2023
7dd0419
Add code to get file meta root
lpoli Aug 4, 2023
efc4535
Modify file meta root retrieval
lpoli Aug 4, 2023
5eb0285
Add field
lpoli Aug 4, 2023
416965a
Apply rename for conductor test based on conductor state
lpoli Aug 4, 2023
66319cf
Add comment
lpoli Aug 4, 2023
73ded20
Merge branch 'sprint-july-4' into add/case-2-and-case-4
lpoli Aug 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ concurrency:

on:
push:
branches:
- master
- staging
tags:
branches: [ master, staging, sprint* ]
pull_request:

jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,7 @@ func TestBlobberCore_CopyFile(t *testing.T) {

config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc

ctx := context.TODO()
db := datastore.GetStore().GetDB().Begin()
ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db)
ctx := datastore.GetStore().CreateTransaction(context.TODO())

change := &CopyFileChange{
AllocationID: tc.allocationID,
Expand Down
19 changes: 6 additions & 13 deletions code/go/0chain.net/blobbercore/allocation/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package allocation

import (
"context"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/errors"
Expand All @@ -12,33 +13,25 @@ import (
// GetOrCreate, get allocation if it exists in db. if not, try to sync it from blockchain, and insert it in db.
func GetOrCreate(ctx context.Context, store datastore.Store, allocationId string) (*Allocation, error) {

db := store.GetDB()
db := store.CreateTransaction(ctx)

if len(allocationId) == 0 {
return nil, errors.Throw(constants.ErrInvalidParameter, "tx")
}

alloc := &Allocation{}
result := db.Table(TableNameAllocation).Where(SQLWhereGetById, allocationId).First(alloc)

if result.Error == nil {
alloc, err := Repo.GetById(db, allocationId)
if err == nil {
return alloc, nil
}
if !errors.Is(err, gorm.ErrRecordNotFound) {

if !errors.Is(result.Error, gorm.ErrRecordNotFound) {

return nil, errors.ThrowLog(result.Error.Error(), common.ErrBadDataStore)
return nil, errors.ThrowLog(err.Error(), common.ErrBadDataStore)
}

return SyncAllocation(allocationId)

}

const (
SQLWhereGetByTx = "allocations.tx = ?"
SQLWhereGetById = "allocations.id = ?"
)

// DryRun Creates a prepared statement when executing any SQL and caches them to speed up future calls
// https://gorm.io/docs/performance.html#Caches-Prepared-Statement
func DryRun(db *gorm.DB) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,7 @@ func TestBlobberCore_DeleteFile(t *testing.T) {

config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc

ctx := context.TODO()
db := datastore.GetStore().GetDB().Begin()
ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db)
ctx := datastore.GetStore().CreateTransaction(context.TODO())

change := &DeleteFileChange{
AllocationID: tc.allocationID,
Expand Down
31 changes: 20 additions & 11 deletions code/go/0chain.net/blobbercore/allocation/entity.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package allocation

import (
"context"
"errors"
"fmt"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"gorm.io/gorm/clause"

Expand Down Expand Up @@ -162,7 +164,8 @@ func (*Pending) TableName() string {
}

// GetPendingWrite Get write size that is not yet redeemed
func GetPendingWrite(db *gorm.DB, clientID, allocationID string) (pendingWriteSize int64, err error) {
func GetPendingWrite(ctx context.Context, clientID, allocationID string) (pendingWriteSize int64, err error) {
db := datastore.GetStore().GetTransaction(ctx)
err = db.Model(&Pending{}).Select("pending_write").Where(
"id=?", fmt.Sprintf("%v:%v", clientID, allocationID),
).Scan(&pendingWriteSize).Error
Expand All @@ -177,7 +180,8 @@ func GetPendingWrite(db *gorm.DB, clientID, allocationID string) (pendingWriteSi
}

// GetPendingRead Get read size that is not yet redeemed
func GetPendingRead(db *gorm.DB, clientID, allocationID string) (pendingReadSize int64, err error) {
func GetPendingRead(ctx context.Context, clientID, allocationID string) (pendingReadSize int64, err error) {
db := datastore.GetStore().GetTransaction(ctx)
err = db.Model(&Pending{}).Select("pending_read").Where(
"id=?", fmt.Sprintf("%v:%v", clientID, allocationID),
).Scan(&pendingReadSize).Error
Expand All @@ -191,7 +195,8 @@ func GetPendingRead(db *gorm.DB, clientID, allocationID string) (pendingReadSize
return
}

func AddToPending(db *gorm.DB, clientID, allocationID string, pendingWrite int64) (err error) {
func AddToPending(ctx context.Context, clientID, allocationID string, pendingWrite int64) (err error) {
db := datastore.GetStore().GetTransaction(ctx)
key := clientID + ":" + allocationID
// Lock is required because two process can simultaneously call this function and read pending data
// thus giving same value leading to inconsistent data
Expand All @@ -215,7 +220,8 @@ func AddToPending(db *gorm.DB, clientID, allocationID string, pendingWrite int64
return nil
}

func GetWritePoolsBalance(db *gorm.DB, allocationID string) (balance uint64, err error) {
func GetWritePoolsBalance(ctx context.Context, allocationID string) (balance uint64, err error) {
db := datastore.GetStore().GetTransaction(ctx)
err = db.Model(&WritePool{}).Select("COALESCE(SUM(balance),0) as tot_balance").Where(
"allocation_id = ?", allocationID,
).Scan(&balance).Error
Expand Down Expand Up @@ -266,36 +272,39 @@ func (WritePool) TableName() string {
return "write_pools"
}

func GetReadPool(db *gorm.DB, clientID string) (*ReadPool, error) {
func GetReadPool(ctx context.Context, clientID string) (*ReadPool, error) {
db := datastore.GetStore().GetTransaction(ctx)
var rp ReadPool
return &rp, db.Model(&ReadPool{}).Where("client_id = ?", clientID).Scan(&rp).Error
}

func GetReadPoolsBalance(db *gorm.DB, clientID string) (int64, error) {
rp, err := GetReadPool(db, clientID)
func GetReadPoolsBalance(ctx context.Context, clientID string) (int64, error) {
rp, err := GetReadPool(ctx, clientID)
if err != nil {
return 0, err
}

return rp.Balance, nil
}

func UpsertReadPool(db *gorm.DB, rp *ReadPool) error {
func UpsertReadPool(ctx context.Context, rp *ReadPool) error {
updateFields := []string{"balance"}

db := datastore.GetStore().GetTransaction(ctx)
return db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "client_id"}},
DoUpdates: clause.AssignmentColumns(updateFields), // column needed to be updated
}).Create(&rp).Error
}

func UpdateReadPool(db *gorm.DB, rp *ReadPool) error {
func UpdateReadPool(ctx context.Context, rp *ReadPool) error {
db := datastore.GetStore().GetTransaction(ctx)
return db.Model(&ReadPool{}).Where("client_id = ?", rp.ClientID).Updates(map[string]interface{}{
"balance": rp.Balance,
}).Error
}

func SetWritePool(db *gorm.DB, allocationID string, wp *WritePool) (err error) {
func SetWritePool(ctx context.Context, allocationID string, wp *WritePool) (err error) {
db := datastore.GetStore().GetTransaction(ctx)
err = db.Delete(&WritePool{}, "allocation_id = ?", allocationID).Error
if err != nil {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {

config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc

ctx := context.TODO()
db := datastore.GetStore().GetDB().Begin()
ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db)
ctx := datastore.GetStore().CreateTransaction(context.TODO())

fPath := "/new"
change := &UploadFileChanger{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,7 @@ func TestBlobberCore_MoveFile(t *testing.T) {

config.Configuration.MaxAllocationDirFiles = tc.maxDirFilesPerAlloc

ctx := context.TODO()
db := datastore.GetStore().GetDB().Begin()
ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db)
ctx := datastore.GetStore().CreateTransaction(context.TODO())

change := &MoveFileChange{
AllocationID: tc.allocationID,
Expand Down
4 changes: 1 addition & 3 deletions code/go/0chain.net/blobbercore/allocation/multiop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ func TestMultiOp(t *testing.T) {
alloc.OwnerPublicKey = sch.GetPublicKey()
alloc.OwnerID = client.GetClientID()
datastore.MocketTheStore(t, true)
ctx := context.TODO()
db := datastore.GetStore().GetDB().Begin()
ctx = context.WithValue(ctx, datastore.ContextKeyTransaction, db)
ctx := datastore.GetStore().CreateTransaction(context.TODO())
setupDbMock()
fileIDMeta := make(map[string]string)
fileIDMeta["/"] = randName()
Expand Down
67 changes: 29 additions & 38 deletions code/go/0chain.net/blobbercore/allocation/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,12 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
"go.uber.org/zap"
"gorm.io/gorm"
)

// GetAllocationByID from DB. This function doesn't load related terms.
func GetAllocationByID(ctx context.Context, allocID string) (a *Allocation, err error) {
var tx = datastore.GetStore().GetTransaction(ctx)

a = new(Allocation)
err = tx.Model(&Allocation{}).
Where("id=?", allocID).
First(a).Error
return
return Repo.GetById(ctx, allocID)
}

// LoadTerms loads corresponding terms from DB. Since, the GetAllocationByID
Expand All @@ -49,14 +42,11 @@ func (a *Allocation) LoadTerms(ctx context.Context) (err error) {
return // found in DB
}

// VerifyAllocationTransaction try to get allocation from postgres.if it doesn't exists, get it from sharders, and insert it into postgres.
// FetchAllocationFromEventsDB try to get allocation from postgres.if it doesn't exists, get it from sharders, and insert it into postgres.
func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, allocationTx string, readonly bool) (a *Allocation, err error) {
var tx = datastore.GetStore().GetTransaction(ctx)

a = new(Allocation)
err = tx.Model(&Allocation{}).
Where(&Allocation{Tx: allocationTx}).
First(a).Error
a, err = Repo.GetByTx(ctx, allocationID, allocationTx)

if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, common.NewError("bad_db_operation", err.Error()) // unexpected DB error
Expand All @@ -81,21 +71,13 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
}

var isExist bool
err = tx.Model(&Allocation{}).
Where("id = ?", sa.ID).
First(a).Error
a, err = Repo.GetById(ctx, sa.ID)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
return nil, common.NewError("bad_db_operation", err.Error()) // unexpected
}

isExist = a.ID != ""

logging.Logger.Info("VerifyAllocationTransaction",
zap.Bool("isExist", isExist),
zap.Any("allocation", a),
zap.Any("storageAllocation", sa),
zap.String("node.Self.ID", node.Self.ID))

if !isExist {
foundBlobber := false
for _, blobberConnection := range sa.BlobberDetails {
Expand Down Expand Up @@ -136,6 +118,7 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc
if err != nil {
return nil, common.NewError("meta_data_update_error", err.Error())
}

// go update allocation data in file store map
// related terms
a.Terms = make([]*Terms, 0, len(sa.BlobberDetails))
Expand All @@ -154,7 +137,8 @@ func FetchAllocationFromEventsDB(ctx context.Context, allocationID string, alloc

logging.Logger.Info("Saving the allocation to DB")

err = tx.Save(a).Error
err = Repo.Save(ctx, a)

if err != nil {
return nil, err
}
Expand All @@ -181,13 +165,16 @@ func RequestReadPoolStat(clientID string) (*ReadPool, error) {
return nil, fmt.Errorf("requesting read pools stat: %v", err)
}

var readPool ReadPool
if err = json.Unmarshal(resp, &readPool); err != nil {
return nil, fmt.Errorf("decoding read pools stat response: %v, \n==> resp: %s", err, string(resp))
if resp != nil {
var readPool ReadPool
if err = json.Unmarshal(resp, &readPool); err != nil {
return nil, fmt.Errorf("decoding read pools stat response: %v, \n==> resp: %s", err, string(resp))
}
readPool.ClientID = clientID
return &readPool, nil
}

readPool.ClientID = clientID
return &readPool, nil
return nil, errors.New("empty response received from MakeSCRestAPICall")
}

func RequestWritePool(allocationID string) (wps *WritePool, err error) {
Expand All @@ -205,16 +192,20 @@ func RequestWritePool(allocationID string) (wps *WritePool, err error) {
return nil, fmt.Errorf("requesting write pools stat: %v", err)
}

var allocation = struct {
ID string `json:"id"`
WritePool uint64 `json:"write_pool"`
}{}
if err = json.Unmarshal(resp, &allocation); err != nil {
return nil, fmt.Errorf("decoding write pools stat response: %v", err)
if resp != nil {
var allocation = struct {
ID string `json:"id"`
WritePool uint64 `json:"write_pool"`
}{}
if err = json.Unmarshal(resp, &allocation); err != nil {
return nil, fmt.Errorf("decoding write pools stat response: %v", err)
}

return &WritePool{
AllocationID: allocationID,
Balance: allocation.WritePool,
}, nil
}

return &WritePool{
AllocationID: allocationID,
Balance: allocation.WritePool,
}, nil
return nil, errors.New("empty response received from MakeSCRestAPICall")
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (rf *RenameFileChange) DeleteTempFile() error {
return nil
}

func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
func (rf *RenameFileChange) applyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) {

if rf.Path == "/" {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//go:build integration_tests
// +build integration_tests

package allocation

import (
"context"
"errors"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/conductor/conductrpc"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
)

func (rf *RenameFileChange) ApplyChange(ctx context.Context, rootRef *reference.Ref, change *AllocationChange,
allocationRoot string, ts common.Timestamp, _ map[string]string) (*reference.Ref, error) {

state := conductrpc.Client().State()
if state.FailRenameCommit != nil {
for _, nodeId := range state.FailRenameCommit {
if nodeId == node.Self.ID {
return nil, errors.New("error directed by conductor")
}
}
}
return rf.applyChange(ctx, rootRef, change, allocationRoot, ts, nil)
}
Loading