Skip to content

Commit

Permalink
add async processing
Browse files Browse the repository at this point in the history
  • Loading branch information
Hitenjain14 committed Aug 26, 2023
1 parent 365093a commit 13def25
Show file tree
Hide file tree
Showing 10 changed files with 465 additions and 319 deletions.
261 changes: 232 additions & 29 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
)

var (
Expand All @@ -16,93 +18,294 @@ var (
)

var (
connectionObjSizeMap = make(map[string]*ConnectionObjSize)
connectionObjMutex sync.RWMutex
connectionProcessor = make(map[string]*ConnectionProcessor)
connectionObjMutex sync.RWMutex
)

type ConnectionObjSize struct {
Size int64
UpdatedAt time.Time
Changes map[string]*ConnectionChanges
type ConnectionProcessor struct {
Size int64
UpdatedAt time.Time
AllocationObj *Allocation
changes map[string]*ConnectionChange
ClientID string
}

type ConnectionChanges struct {
Hasher *filestore.CommitHasher
type ConnectionChange struct {
hasher *filestore.CommitHasher
BaseChanger *BaseFileChanger
processError error
ProcessChan chan FileCommand
wg sync.WaitGroup
isFinalized bool
}

func CreateConnectionChange(connectionID, pathHash string) *ConnectionChange {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
connectionObj = &ConnectionProcessor{
UpdatedAt: time.Now(),
changes: make(map[string]*ConnectionChange),
}
connectionProcessor[connectionID] = connectionObj
}
connChange := &ConnectionChange{
ProcessChan: make(chan FileCommand, 2),
}
connectionProcessor[connectionID].changes[pathHash] = connChange
connChange.wg.Add(1)
go func() {
processCommand(connChange.ProcessChan, connectionObj.AllocationObj, connectionID, connectionObj.ClientID)
connChange.wg.Done()
}()
return connectionProcessor[connectionID].changes[pathHash]
}

func GetConnectionChange(connectionID, pathHash string) *ConnectionChange {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
return nil
}
return connectionProcessor[connectionID].changes[pathHash]
}

func GetFileChanger(connectionID, pathHash string) *BaseFileChanger {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
return nil
}
if connectionObj.changes[pathHash] == nil {
return nil
}
return connectionObj.changes[pathHash].BaseChanger
}

func SaveFileChanger(connectionID string, fileChanger *BaseFileChanger) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
connectionProcessor[connectionID] = &ConnectionProcessor{
UpdatedAt: time.Now(),
changes: make(map[string]*ConnectionChange),
}
}
connectionProcessor[connectionID].changes[fileChanger.PathHash].BaseChanger = fileChanger
}

func SetFinalized(connectionID, pathHash string, cmd FileCommand) error {
connectionObjMutex.Lock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
connectionObjMutex.Unlock()
return common.NewError("connection_not_found", "connection not found")
}
connChange := connectionProcessor[connectionID].changes[pathHash]
if connChange.isFinalized {
connectionObjMutex.Unlock()
return common.NewError("connection_change_finalized", "connection change finalized")
}
connChange.isFinalized = true
connectionObjMutex.Unlock()
connChange.ProcessChan <- cmd
close(connChange.ProcessChan)
connChange.wg.Wait()
return GetError(connectionID, pathHash)
}

func SendCommand(connectionID, pathHash string, cmd FileCommand) error {
connectionObjMutex.RLock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
connectionObjMutex.RUnlock()
return common.NewError("connection_not_found", "connection not found")
}
connChange := connectionProcessor[connectionID].changes[pathHash]
if connChange == nil {
return common.NewError("connection_change_not_found", "connection change not found")
}
if connChange.isFinalized {
return common.NewError("connection_change_finalized", "connection change finalized")
}
connectionObjMutex.RUnlock()
connChange.ProcessChan <- cmd
return nil
}

func GetConnectionProcessor(connectionID string) *ConnectionProcessor {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
return nil
}
return connectionProcessor[connectionID]
}

func CreateConnectionProcessor(connectionID string) *ConnectionProcessor {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
connectionProcessor[connectionID] = &ConnectionProcessor{
UpdatedAt: time.Now(),
changes: make(map[string]*ConnectionChange),
}
}
return connectionProcessor[connectionID]
}

func SetError(connectionID, pathHash string, err error) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
return
}
connChange := connectionProcessor[connectionID].changes[pathHash]
connChange.processError = err
drainChan(connChange.ProcessChan) // drain the channel so that the no commands are blocked
}

func GetError(connectionID, pathHash string) error {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
return nil
}
connChange := connectionProcessor[connectionID].changes[pathHash]
if connChange == nil {
return nil
}
return connChange.processError
}

// GetConnectionObjSize gets the connection size from the memory
func GetConnectionObjSize(connectionID string) int64 {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObjSize := connectionObjSizeMap[connectionID]
if connectionObjSize == nil {
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
return 0
}
return connectionObjSizeMap[connectionID].Size
return connectionProcessor[connectionID].Size
}

// UpdateConnectionObjSize updates the connection size by addSize in memory
func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObjSize := connectionObjSizeMap[connectionID]
if connectionObjSize == nil {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
connectionProcessor[connectionID] = &ConnectionProcessor{
Size: addSize,
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
changes: make(map[string]*ConnectionChange),
}
return
}

connectionObjSize.Size = connectionObjSize.Size + addSize
connectionObjSize.UpdatedAt = time.Now()
connectionObj.Size = connectionObj.Size + addSize
connectionObj.UpdatedAt = time.Now()
}

func GetHasher(connectionID, pathHash string) *filestore.CommitHasher {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionObjSizeMap[connectionID]
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
return nil
}
if connectionObj.Changes[pathHash] == nil {
if connectionObj.changes[pathHash] == nil {
return nil
}
return connectionObj.Changes[pathHash].Hasher
return connectionObj.changes[pathHash].hasher
}

func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filestore.CommitHasher) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionObjSizeMap[connectionID]
connectionObj := connectionProcessor[connectionID]
if connectionObj == nil {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
connectionProcessor[connectionID] = &ConnectionProcessor{
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
changes: make(map[string]*ConnectionChange),
}
}
connectionObjSizeMap[connectionID].Changes[pathHash] = &ConnectionChanges{
Hasher: hasher,
connectionProcessor[connectionID].changes[pathHash] = &ConnectionChange{
hasher: hasher,
}
}

func processCommand(processorChan chan FileCommand, allocationObj *Allocation, connectionID, clientID string) {
for {

Check failure on line 247 in code/go/0chain.net/blobbercore/allocation/connection.go

View workflow job for this annotation

GitHub Actions / Lints

S1000: should use for range instead of for { select {} } (gosimple)
select {
case cmd, ok := <-processorChan:
if !ok || cmd == nil {
return
}
res, err := cmd.ProcessContent(allocationObj)
if err != nil {
SetError(connectionID, cmd.GetPath(), err)
return
}
err = cmd.ProcessThumbnail(allocationObj)
if err != nil {
SetError(connectionID, cmd.GetPath(), err)
return
}
if res.IsFinal {
err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error {
connectionObj, err := GetAllocationChanges(ctx, connectionID, allocationObj.ID, clientID)
if err != nil {
return err
}
return cmd.UpdateChange(ctx, connectionObj)
})
if err != nil {
SetError(connectionID, cmd.GetPath(), err)
}
return
}
}
}
}

func drainChan(processorChan chan FileCommand) {
for {
select {
case _, ok := <-processorChan:
if !ok {
return
}
default:
return
}
}
}

// DeleteConnectionObjEntry remove the connectionID entry from map
// If the given connectionID is not present, then it is no-op.
func DeleteConnectionObjEntry(connectionID string) {
connectionObjMutex.Lock()
delete(connectionObjSizeMap, connectionID)
delete(connectionProcessor, connectionID)
connectionObjMutex.Unlock()
}

// cleanConnectionObj cleans the connectionObjSize map. It deletes the rows
// for which deadline is exceeded.
func cleanConnectionObj() {
connectionObjMutex.Lock()
for connectionID, connectionObjSize := range connectionObjSizeMap {
diff := time.Since(connectionObjSize.UpdatedAt)
for connectionID, connectionObj := range connectionProcessor {
diff := time.Since(connectionObj.UpdatedAt)
if diff >= ConnectionObjTimeout {
delete(connectionObjSizeMap, connectionID)
delete(connectionProcessor, connectionID)
}
}
connectionObjMutex.Unlock()
Expand Down
38 changes: 38 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/file_changer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package allocation

import (
"context"
"net/http"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)
Expand Down Expand Up @@ -56,6 +58,42 @@ type BaseFileChanger struct {
ChunkEndIndex int `json:"chunk_end_index,omitempty"` // end index of chunks. all chunks MUST be uploaded one by one because of CompactMerkleTree
ChunkHash string `json:"chunk_hash,omitempty"`
UploadOffset int64 `json:"upload_offset,omitempty"` // It is next position that new incoming chunk should be append to
PathHash string `json:"-"` // hash of path
}

// swagger:model UploadResult
type UploadResult struct {
Filename string `json:"filename"`
Size int64 `json:"size"`
Hash string `json:"hash"`
ValidationRoot string `json:"validation_root"`
FixedMerkleRoot string `json:"fixed_merkle_root"`

// UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64 `json:"upload_length"`
// Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64 `json:"upload_offset"`
IsFinal bool `json:"-"`
}

type FileCommand interface {

// GetExistingFileRef get file ref if it exists
GetExistingFileRef() *reference.Ref

GetPath() string

// IsValidated validate request, and try build ChangeProcesser instance
IsValidated(ctx context.Context, req *http.Request, allocationObj *Allocation, clientID string) error

// ProcessContent flush file to FileStorage
ProcessContent(allocationObj *Allocation) (UploadResult, error)

// ProcessThumbnail flush thumbnail file to FileStorage if it has.
ProcessThumbnail(allocationObj *Allocation) error

// UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation
UpdateChange(ctx context.Context, connectionObj *AllocationChangeCollector) error
}

func (fc *BaseFileChanger) DeleteTempFile() error {
Expand Down
Loading

0 comments on commit 13def25

Please sign in to comment.