Skip to content

Commit

Permalink
Merge pull request #343 from teharrison/master
Browse files Browse the repository at this point in the history
locking and saving
  • Loading branch information
teharrison authored Apr 16, 2018
2 parents 053e7c5 + 87889d3 commit 6b43e3a
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 275 deletions.
5 changes: 3 additions & 2 deletions shock-server/controller/node/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ func IndexTypedRequest(ctx context.Context) {
return
}

if err := n.SetIndexInfo(idxType, idxInfo); err != nil {
logger.Error("[email protected]: " + err.Error())
n.SetIndexInfo(idxType, idxInfo)
if err := n.Save(); err != nil {
logger.Error("[email protected]: " + err.Error())
}

if conf.LOG_PERF {
Expand Down
2 changes: 1 addition & 1 deletion shock-server/controller/node/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (cr *NodeController) Replace(id string, ctx context.Context) error {
}
}

err = n.Update(params, files)
err = n.Update(params, files, false)
if err != nil {
err_msg := "err@node_Update: " + id + ": " + err.Error()
logger.Error(err_msg)
Expand Down
4 changes: 2 additions & 2 deletions shock-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func mapRoutes() {
return nil
})

goweb.Map("/openparts", func(ctx context.Context) error {
ids := node.LockMgr.GetNodes()
goweb.Map("/locked", func(ctx context.Context) error {
ids := node.LockMgr.GetLocked()
return responder.RespondWithData(ctx, ids)
})

Expand Down
2 changes: 2 additions & 0 deletions shock-server/node/expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (nr *NodeReaper) Handle() {
logger.Error(err_msg)
}
}
// remove old nodes from Locker, value is hours old
LockMgr.RemoveOldNodes(1)
}
}

Expand Down
6 changes: 3 additions & 3 deletions shock-server/node/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (node *Node) SetFile(file FormFile) (err error) {
if err != nil {
return
}

os.Rename(file.Path, node.FilePath())
node.File.Name = file.Name
node.File.Size = fileStat.Size()
Expand All @@ -38,7 +39,7 @@ func (node *Node) SetFile(file FormFile) (err error) {
Format: "dynamic",
CreatedOn: time.Now(),
}
err = node.Save()

return
}

Expand Down Expand Up @@ -115,7 +116,6 @@ func (node *Node) SetFileFromSubset(subsetIndices FormFile) (err error) {
CreatedOn: time.Now(),
}

err = node.Save()
return
}

Expand Down Expand Up @@ -210,7 +210,7 @@ func (node *Node) SetFileFromPath(path string, action string) (err error) {
} else {
node.File.Path = path
}
err = node.Save()

return
}

Expand Down
105 changes: 105 additions & 0 deletions shock-server/node/locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package node

import (
"fmt"
"sync"
"time"
)

var (
LockMgr = NewLocker()
)

func NewLocker() *Locker {
return &Locker{
nodes: map[string]*NodeLock{},
}
}

type Locker struct {
nodes map[string]*NodeLock
sync.Mutex
}

type NodeLock struct {
isLocked bool
updated time.Time
writeLock chan int
}

func (n *NodeLock) init() {
n.isLocked = false
n.updated = time.Now()
n.writeLock = make(chan int, 1)
n.writeLock <- 1 // Put the initial value into the channel
}

func (n *NodeLock) lock(id string) (err error) {
select {
case <-n.writeLock: // Grab the ticket - here is where we wait
case <-time.After(time.Minute * 30):
err = fmt.Errorf("Timeout!! Waited 30 mins on lock for node %s", id)
return
}
n.isLocked = true
n.updated = time.Now()
return
}

func (n *NodeLock) unlock() {
n.isLocked = false
n.updated = time.Now()
n.writeLock <- 1 // Release the ticket
}

func (l *Locker) LockNode(id string) (err error) {
// add if missing, may happen if shock restarted
if _, ok := l.nodes[id]; !ok {
l.AddNode(id)
}
err = l.nodes[id].lock(id)
return
}

func (l *Locker) UnlockNode(id string) {
// skip missing id
if _, ok := l.nodes[id]; ok {
l.nodes[id].unlock()
}
}

func (l *Locker) GetLocked() (ids []string) {
l.Lock()
for id, n := range l.nodes {
if n.isLocked {
ids = append(ids, id)
}
}
l.Unlock()
return
}

func (l *Locker) AddNode(id string) {
l.Lock()
l.nodes[id] = new(NodeLock)
l.nodes[id].init()
l.Unlock()
}

func (l *Locker) RemoveNode(id string) {
l.Lock()
delete(l.nodes, id)
l.Unlock()
}

func (l *Locker) RemoveOldNodes(hours int) {
currTime := time.Now()
expireTime := currTime.Add(time.Duration(hours*-1) * time.Hour)
l.Lock()
for id, n := range l.nodes {
if (!n.isLocked) && n.updated.Before(expireTime) {
delete(l.nodes, id)
}
}
l.Unlock()
}
75 changes: 37 additions & 38 deletions shock-server/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"io/ioutil"
"os"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -130,13 +131,13 @@ func CreateNodeUpload(u *user.User, params map[string]string, files FormFiles) (
return
}

err = node.Update(params, files)
// update saves node
err = node.Update(params, files, true)
if err != nil {
err = fmt.Errorf("(node.Update) %s", err.Error())
node.Rmdir()
return
}

err = node.Save()
return
}

Expand Down Expand Up @@ -219,9 +220,9 @@ func CreateNodesFromArchive(u *user.User, params map[string]string, files FormFi

// save nodes, only return those that were created / saved
for _, n := range tempNodes {
if err = n.Save(); err != nil {
if serr := n.Save(); serr != nil {
n.Rmdir()
return nil, err
continue
}
nodes = append(nodes, n)
}
Expand Down Expand Up @@ -264,6 +265,13 @@ func (node *Node) DynamicIndex(name string) (idx index.Index, err error) {
}

func (node *Node) Delete() (err error) {
// lock node
err = LockMgr.LockNode(node.Id)
if err != nil {
return
}
defer LockMgr.RemoveNode(node.Id)

// check to make sure this node isn't referenced by a vnode
virtualNodes := Nodes{}
if _, err = dbFind(bson.M{"file.virtual_parts": node.Id}, &virtualNodes, "", nil); err != nil {
Expand Down Expand Up @@ -306,7 +314,8 @@ func (node *Node) Delete() (err error) {
if err = dbDelete(bson.M{"id": node.Id}); err != nil {
return err
}
return node.Rmdir()
err = node.Rmdir()
return
}

func (node *Node) DeleteIndex(indextype string) (err error) {
Expand All @@ -319,22 +328,8 @@ func (node *Node) DeleteIndex(indextype string) (err error) {
return
}

func (node *Node) SetIndexInfo(indextype string, idxinfo IdxInfo) (err error) {
func (node *Node) SetIndexInfo(indextype string, idxinfo IdxInfo) {
node.Indexes[indextype] = idxinfo
err = node.Save()
return
}

func (node *Node) SetFileFormat(format string) (err error) {
node.File.Format = format
err = node.Save()
return
}

func (node *Node) SetPriority(priority int) (err error) {
node.Priority = priority
err = node.Save()
return
}

func (node *Node) SetExpiration(expire string) (err error) {
Expand All @@ -356,21 +351,6 @@ func (node *Node) SetExpiration(expire string) (err error) {
}

node.Expiration = currTime.Add(expireTime)
err = node.Save()
return
}

func (node *Node) RemoveExpiration() (err error) {
// reset to empty time
node.Expiration = time.Time{}
err = node.Save()
return
}

func (node *Node) ClearRevisions() (err error) {
// empty the revisions array
node.Revisions = []Node{}
err = node.Save()
return
}

Expand All @@ -384,7 +364,6 @@ func (node *Node) SetAttributes(attr FormFile) (err error) {
if err != nil {
return
}
err = node.Save()
return
}

Expand All @@ -393,6 +372,26 @@ func (node *Node) SetAttributesFromString(attributes string) (err error) {
if err != nil {
return
}
err = node.Save()
return
}

func (node *Node) UpdateDataTags(types string) {
tagslist := strings.Split(types, ",")
for _, newtag := range tagslist {
if contains(node.Tags, newtag) {
continue
}
node.Tags = append(node.Tags, newtag)
}
}

func (node *Node) UpdateLinkages(ltype string, ids string, operation string) {
var link linkage
link.Type = ltype
idList := strings.Split(ids, ",")
for _, id := range idList {
link.Ids = append(link.Ids, id)
}
link.Operation = operation
node.Linkages = append(node.Linkages, link)
}
13 changes: 1 addition & 12 deletions shock-server/node/parts.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ func (node *Node) initParts(partsCount string, compressionFormat string) (err er
Parts: make([]partsFile, count),
Compression: compressionFormat,
}
if err = node.Save(); err != nil {
return err
}

// add node id to LockMgr
LockMgr.AddNode(node.Id)
return
}

Expand Down Expand Up @@ -82,7 +77,6 @@ func (node *Node) addVirtualParts(ids []string) (err error) {
} else {
return err
}
err = node.Save()
return
}

Expand Down Expand Up @@ -116,7 +110,6 @@ func (node *Node) addPart(n int, file *FormFile) (err error) {
if err = os.Rename(file.Path, fmt.Sprintf("%s/parts/%d", node.Path(), n+1)); err != nil {
return err
}
err = node.Save()
return
}

Expand All @@ -126,10 +119,6 @@ func (node *Node) closeParts(allowEmpty bool) (err error) {
if err = node.SetFileFromParts(allowEmpty); err != nil {
return err
}
if err = os.RemoveAll(node.Path() + "/parts/"); err != nil {
return err
}
// remove node id from LockMgr
LockMgr.RemoveNode(node.Id)
err = os.RemoveAll(node.Path() + "/parts/")
return
}
Loading

0 comments on commit 6b43e3a

Please sign in to comment.