From 98bd62410c99c4c258fd1d899073b5020fc262c4 Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Thu, 12 Apr 2018 15:44:38 -0500 Subject: [PATCH 1/2] only save node state at end of update, add node lock for life of update, use global lock object with timeout and cleanup --- shock-server/controller/node/index/index.go | 5 +- shock-server/main.go | 4 +- shock-server/node/expire.go | 2 + shock-server/node/fs.go | 6 +- shock-server/node/locker.go | 104 ++++++++++++++++++ shock-server/node/node.go | 72 ++++++------ shock-server/node/parts.go | 13 +-- shock-server/node/update.go | 116 +++++--------------- shock-server/node/util.go | 51 --------- 9 files changed, 179 insertions(+), 194 deletions(-) create mode 100644 shock-server/node/locker.go diff --git a/shock-server/controller/node/index/index.go b/shock-server/controller/node/index/index.go index 4af1c267..13852a29 100644 --- a/shock-server/controller/node/index/index.go +++ b/shock-server/controller/node/index/index.go @@ -300,8 +300,9 @@ func IndexTypedRequest(ctx context.Context) { return } - if err := n.SetIndexInfo(idxType, idxInfo); err != nil { - logger.Error("err@node.SetIndexInfo: " + err.Error()) + n.SetIndexInfo(idxType, idxInfo) + if err := n.Save(); err != nil { + logger.Error("err@node.Save: " + err.Error()) } if conf.LOG_PERF { diff --git a/shock-server/main.go b/shock-server/main.go index 447bb069..f5f03f28 100644 --- a/shock-server/main.go +++ b/shock-server/main.go @@ -119,8 +119,8 @@ func mapRoutes() { return nil }) - goweb.Map("/openparts", func(ctx context.Context) error { - ids := node.LockMgr.GetNodes() + goweb.Map("/locked", func(ctx context.Context) error { + ids := node.LockMgr.GetLocked() return responder.RespondWithData(ctx, ids) }) diff --git a/shock-server/node/expire.go b/shock-server/node/expire.go index 3f453767..be3d36f4 100644 --- a/shock-server/node/expire.go +++ b/shock-server/node/expire.go @@ -40,6 +40,8 @@ func (nr *NodeReaper) Handle() { logger.Error(err_msg) } } + // remove old nodes from Locker, value is hours old + LockMgr.RemoveOldNodes(1) } } diff --git a/shock-server/node/fs.go b/shock-server/node/fs.go index 96afaa55..74285f06 100644 --- a/shock-server/node/fs.go +++ b/shock-server/node/fs.go @@ -19,6 +19,7 @@ func (node *Node) SetFile(file FormFile) (err error) { if err != nil { return } + os.Rename(file.Path, node.FilePath()) node.File.Name = file.Name node.File.Size = fileStat.Size() @@ -38,7 +39,7 @@ func (node *Node) SetFile(file FormFile) (err error) { Format: "dynamic", CreatedOn: time.Now(), } - err = node.Save() + return } @@ -115,7 +116,6 @@ func (node *Node) SetFileFromSubset(subsetIndices FormFile) (err error) { CreatedOn: time.Now(), } - err = node.Save() return } @@ -210,7 +210,7 @@ func (node *Node) SetFileFromPath(path string, action string) (err error) { } else { node.File.Path = path } - err = node.Save() + return } diff --git a/shock-server/node/locker.go b/shock-server/node/locker.go new file mode 100644 index 00000000..ada12c2e --- /dev/null +++ b/shock-server/node/locker.go @@ -0,0 +1,104 @@ +package node + +import ( + "fmt" + "sync" + "time" +) + +var ( + LockMgr = NewLocker() +) + +func NewLocker() *Locker { + return &Locker{ + nodes: map[string]*NodeLock{}, + } +} + +type Locker struct { + nodes map[string]*NodeLock + sync.Mutex +} + +type NodeLock struct { + isLocked bool + updated time.Time + writeLock chan int +} + +func (n *NodeLock) init() { + n.isLocked = false + n.updated = time.Now() + n.writeLock <- 1 // Put the initial value into the channel +} + +func (n *NodeLock) lock(id string) (err error) { + select { + case <-n.writeLock: // Grab the ticket - here is where we wait + case <-time.After(time.Minute * 30): + err = fmt.Errorf("Timeout!! Waited 30 mins on lock for node %s", id) + return + } + n.isLocked = true + n.updated = time.Now() + return +} + +func (n *NodeLock) unlock() { + n.isLocked = false + n.updated = time.Now() + n.writeLock <- 1 // Release the ticket +} + +func (l *Locker) LockNode(id string) (err error) { + // add if missing, may happen if shock restarted + if _, ok := l.nodes[id]; !ok { + l.AddNode(id) + } + err = l.nodes[id].lock(id) + return +} + +func (l *Locker) UnlockNode(id string) { + // skip missing id + if _, ok := l.nodes[id]; ok { + l.nodes[id].unlock() + } +} + +func (l *Locker) GetLocked() (ids []string) { + l.Lock() + for id, n := range l.nodes { + if n.isLocked { + ids = append(ids, id) + } + } + l.Unlock() + return +} + +func (l *Locker) AddNode(id string) { + l.Lock() + l.nodes[id] = new(NodeLock) + l.nodes[id].init() + l.Unlock() +} + +func (l *Locker) RemoveNode(id string) { + l.Lock() + delete(l.nodes, id) + l.Unlock() +} + +func (l *Locker) RemoveOldNodes(hours int) { + currTime := time.Now() + expireTime := currTime.Add(time.Duration(hours*-1) * time.Hour) + l.Lock() + for id, n := range l.nodes { + if (!n.isLocked) && n.updated.Before(expireTime) { + delete(l.nodes, id) + } + } + l.Unlock() +} diff --git a/shock-server/node/node.go b/shock-server/node/node.go index 7401a76a..bfff1344 100644 --- a/shock-server/node/node.go +++ b/shock-server/node/node.go @@ -15,6 +15,7 @@ import ( "io/ioutil" "os" "strconv" + "strings" "time" ) @@ -130,13 +131,12 @@ func CreateNodeUpload(u *user.User, params map[string]string, files FormFiles) ( return } + // update saves node err = node.Update(params, files) if err != nil { node.Rmdir() - return } - err = node.Save() return } @@ -219,9 +219,9 @@ func CreateNodesFromArchive(u *user.User, params map[string]string, files FormFi // save nodes, only return those that were created / saved for _, n := range tempNodes { - if err = n.Save(); err != nil { + if serr := n.Save(); serr != nil { n.Rmdir() - return nil, err + continue } nodes = append(nodes, n) } @@ -264,6 +264,13 @@ func (node *Node) DynamicIndex(name string) (idx index.Index, err error) { } func (node *Node) Delete() (err error) { + // lock node + err = LockMgr.LockNode(node.Id) + if err != nil { + return + } + defer LockMgr.RemoveNode(node.Id) + // check to make sure this node isn't referenced by a vnode virtualNodes := Nodes{} if _, err = dbFind(bson.M{"file.virtual_parts": node.Id}, &virtualNodes, "", nil); err != nil { @@ -306,7 +313,8 @@ func (node *Node) Delete() (err error) { if err = dbDelete(bson.M{"id": node.Id}); err != nil { return err } - return node.Rmdir() + err = node.Rmdir() + return } func (node *Node) DeleteIndex(indextype string) (err error) { @@ -319,22 +327,8 @@ func (node *Node) DeleteIndex(indextype string) (err error) { return } -func (node *Node) SetIndexInfo(indextype string, idxinfo IdxInfo) (err error) { +func (node *Node) SetIndexInfo(indextype string, idxinfo IdxInfo) { node.Indexes[indextype] = idxinfo - err = node.Save() - return -} - -func (node *Node) SetFileFormat(format string) (err error) { - node.File.Format = format - err = node.Save() - return -} - -func (node *Node) SetPriority(priority int) (err error) { - node.Priority = priority - err = node.Save() - return } func (node *Node) SetExpiration(expire string) (err error) { @@ -356,21 +350,6 @@ func (node *Node) SetExpiration(expire string) (err error) { } node.Expiration = currTime.Add(expireTime) - err = node.Save() - return -} - -func (node *Node) RemoveExpiration() (err error) { - // reset to empty time - node.Expiration = time.Time{} - err = node.Save() - return -} - -func (node *Node) ClearRevisions() (err error) { - // empty the revisions array - node.Revisions = []Node{} - err = node.Save() return } @@ -384,7 +363,6 @@ func (node *Node) SetAttributes(attr FormFile) (err error) { if err != nil { return } - err = node.Save() return } @@ -393,6 +371,26 @@ func (node *Node) SetAttributesFromString(attributes string) (err error) { if err != nil { return } - err = node.Save() return } + +func (node *Node) UpdateDataTags(types string) { + tagslist := strings.Split(types, ",") + for _, newtag := range tagslist { + if contains(node.Tags, newtag) { + continue + } + node.Tags = append(node.Tags, newtag) + } +} + +func (node *Node) UpdateLinkages(ltype string, ids string, operation string) { + var link linkage + link.Type = ltype + idList := strings.Split(ids, ",") + for _, id := range idList { + link.Ids = append(link.Ids, id) + } + link.Operation = operation + node.Linkages = append(node.Linkages, link) +} diff --git a/shock-server/node/parts.go b/shock-server/node/parts.go index cb8942bc..95ea9568 100644 --- a/shock-server/node/parts.go +++ b/shock-server/node/parts.go @@ -45,12 +45,7 @@ func (node *Node) initParts(partsCount string, compressionFormat string) (err er Parts: make([]partsFile, count), Compression: compressionFormat, } - if err = node.Save(); err != nil { - return err - } - // add node id to LockMgr - LockMgr.AddNode(node.Id) return } @@ -82,7 +77,6 @@ func (node *Node) addVirtualParts(ids []string) (err error) { } else { return err } - err = node.Save() return } @@ -116,7 +110,6 @@ func (node *Node) addPart(n int, file *FormFile) (err error) { if err = os.Rename(file.Path, fmt.Sprintf("%s/parts/%d", node.Path(), n+1)); err != nil { return err } - err = node.Save() return } @@ -126,10 +119,6 @@ func (node *Node) closeParts(allowEmpty bool) (err error) { if err = node.SetFileFromParts(allowEmpty); err != nil { return err } - if err = os.RemoveAll(node.Path() + "/parts/"); err != nil { - return err - } - // remove node id from LockMgr - LockMgr.RemoveNode(node.Id) + err = os.RemoveAll(node.Path() + "/parts/") return } diff --git a/shock-server/node/update.go b/shock-server/node/update.go index b6da4ddc..016b50bc 100644 --- a/shock-server/node/update.go +++ b/shock-server/node/update.go @@ -32,7 +32,22 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // // All condition allow setting of attributes // - // Note that all paths for node operations in this function must end with "err = node.Save()" to save node state. + // state is saved to mongodb at end of update function + + // global lock on a node that is being updated + err = LockMgr.LockNode(node.Id) + if err != nil { + return + } + defer LockMgr.UnlockNode(node.Id) + + // refresh node state + var n *Node + n, err = Load(node.Id) + if err != nil { + return + } + node = n for _, u := range util.ValidUpload { if _, uploadMisplaced := params[u]; uploadMisplaced { @@ -117,16 +132,6 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if (node.Type != "parts") || (node.Parts == nil) || !node.Parts.VarLen { return errors.New("can only call 'close' on unknown parts node") } - // we do a node level lock here incase its processing a part - // Refresh parts information after locking, before saving. - LockMgr.LockNode(node.Id) - n, err := Load(node.Id) - if err != nil { - LockMgr.UnlockNode(node.Id) - return err - } - node.Parts = n.Parts - // closeParts removes node id from LockMgr, no need unlock if err = node.closeParts(true); err != nil { return err } @@ -141,7 +146,6 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } } if params["parts"] == "unknown" { - // initParts adds node id to LockMgr if err = node.initParts("unknown", compressionFormat); err != nil { return err } @@ -153,7 +157,6 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if n < 1 { return errors.New("parts cannot be less than 1") } - // initParts adds node id to LockMgr if err = node.initParts(params["parts"], compressionFormat); err != nil { return err } @@ -239,15 +242,11 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } } // copy index struct - if err := node.SetIndexInfo(idxType, idxInfo); err != nil { - return err - } + node.SetIndexInfo(idxType, idxInfo) } } else if sizeIndex, exists := n.Indexes["size"]; exists { // just copy size index - if err := node.SetIndexInfo("size", sizeIndex); err != nil { - return err - } + node.SetIndexInfo("size", sizeIndex) } if n.File.Path == "" { @@ -255,10 +254,6 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } else { node.File.Path = n.File.Path } - - if err = node.Save(); err != nil { - return err - } } else if isSubsetUpload { fInfo, statErr := os.Stat(files["subset_indices"].Path) if statErr != nil { @@ -315,10 +310,6 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) return err } delete(files, "subset_indices") - } else { - if err = node.Save(); err != nil { - return err - } } } } @@ -342,16 +333,11 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if err = node.SetAttributesFromString(params["attributes_str"]); err != nil { return err } - delete(params, "attributes_str") } // set filename string if _, hasFileNameStr := params["file_name"]; hasFileNameStr { node.File.Name = params["file_name"] - if err = node.Save(); err != nil { - return err - } - delete(params, "file_name") } // update relatives @@ -373,16 +359,12 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if _, hasOp := params["operation"]; hasOp { operation = params["operation"] } - if err = node.UpdateLinkages(ltype, ids, operation); err != nil { - return err - } + node.UpdateLinkages(ltype, ids, operation) } // update node tags if _, hasDataType := params["tags"]; hasDataType { - if err = node.UpdateDataTags(params["tags"]); err != nil { - return err - } + node.UpdateDataTags(params["tags"]) } // update file format @@ -390,9 +372,7 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if node.File.Format != "" { return errors.New(fmt.Sprintf("file format already set:%s", node.File.Format)) } - if err = node.SetFileFormat(params["format"]); err != nil { - return err - } + node.File.Format = params["format"] } // update priority @@ -401,9 +381,7 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if err != nil { return errors.New("priority must be an integer") } - if err = node.SetPriority(priority); err != nil { - return err - } + node.Priority = priority } // update node expiration @@ -413,19 +391,17 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } } if _, hasRemove := params["remove_expiration"]; hasRemove { - if err = node.RemoveExpiration(); err != nil { - return err - } + // reset to empty time + node.Expiration = time.Time{} } // clear node revisions if _, hasClearRevisions := params["clear_revisions"]; hasClearRevisions { - if err = node.ClearRevisions(); err != nil { - return err - } + // empty the revisions array + node.Revisions = []Node{} } - // handle part file / we do a node level lock here + // handle part file if hasPartsFile { if node.HasFile() { return errors.New(e.FileImut) @@ -433,16 +409,6 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if (node.Type != "parts") || (node.Parts == nil) { return errors.New("This is not a parts node and thus does not support uploading in parts.") } - LockMgr.LockNode(node.Id) - defer LockMgr.UnlockNode(node.Id) - - // Refresh parts information after locking, before saving. - // Load node by id - n, err := Load(node.Id) - if err != nil { - return err - } - node.Parts = n.Parts if node.Parts.Count > 0 || node.Parts.VarLen { for key, file := range files { @@ -457,7 +423,6 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) return errors.New("Unable to retrieve parts info for node.") } // all parts are in, close it - // closeParts removes node id from LockMgr if !node.Parts.VarLen && node.Parts.Length == node.Parts.Count { if err = node.closeParts(false); err != nil { return err @@ -465,6 +430,8 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } } + // save only after all updates applied + err = node.Save() return } @@ -549,28 +516,3 @@ func (node *Node) UpdateVersion() (err error) { node.VersionParts = versionParts return } - -func (node *Node) UpdateLinkages(ltype string, ids string, operation string) (err error) { - var link linkage - link.Type = ltype - idList := strings.Split(ids, ",") - for _, id := range idList { - link.Ids = append(link.Ids, id) - } - link.Operation = operation - node.Linkages = append(node.Linkages, link) - err = node.Save() - return -} - -func (node *Node) UpdateDataTags(types string) (err error) { - tagslist := strings.Split(types, ",") - for _, newtag := range tagslist { - if contains(node.Tags, newtag) { - continue - } - node.Tags = append(node.Tags, newtag) - } - err = node.Save() - return -} diff --git a/shock-server/node/util.go b/shock-server/node/util.go index d13481eb..1f8c551c 100644 --- a/shock-server/node/util.go +++ b/shock-server/node/util.go @@ -2,7 +2,6 @@ package node import ( "sort" - "sync" ) type mappy map[string]bool @@ -16,56 +15,6 @@ func IsInMappy(item string, mp mappy) bool { var virtIdx = mappy{"size": true} -var ( - LockMgr = NewLocker() -) - -type Locker struct { - nLock map[string]*NodeLock -} - -type NodeLock struct { - sync.Mutex -} - -func NewLocker() *Locker { - return &Locker{ - nLock: map[string]*NodeLock{}, - } -} - -func (l *Locker) LockNode(id string) { - // add if missing, may happen if shock restarted - if _, ok := l.nLock[id]; !ok { - l.nLock[id] = new(NodeLock) - } - l.nLock[id].Lock() -} - -func (l *Locker) UnlockNode(id string) { - // skip missing id - if _, ok := l.nLock[id]; ok { - l.nLock[id].Unlock() - } -} - -func (l *Locker) AddNode(id string) { - if _, ok := l.nLock[id]; !ok { - l.nLock[id] = new(NodeLock) - } -} - -func (l *Locker) RemoveNode(id string) { - delete(l.nLock, id) -} - -func (l *Locker) GetNodes() (ids []string) { - for id, _ := range l.nLock { - ids = append(ids, id) - } - return -} - type sortBytes []byte func (b sortBytes) Less(i, j int) bool { From 87889d3fdac293229212c6d29a77f910a48f2d1a Mon Sep 17 00:00:00 2001 From: Travis Harrison Date: Fri, 13 Apr 2018 11:51:55 -0500 Subject: [PATCH 2/2] fix lock init, add more helpful error messages --- shock-server/controller/node/update.go | 2 +- shock-server/node/locker.go | 1 + shock-server/node/node.go | 3 +- shock-server/node/update.go | 234 ++++++++++++++++--------- 4 files changed, 152 insertions(+), 88 deletions(-) diff --git a/shock-server/controller/node/update.go b/shock-server/controller/node/update.go index 63b23723..ab4ac8fc 100644 --- a/shock-server/controller/node/update.go +++ b/shock-server/controller/node/update.go @@ -82,7 +82,7 @@ func (cr *NodeController) Replace(id string, ctx context.Context) error { } } - err = n.Update(params, files) + err = n.Update(params, files, false) if err != nil { err_msg := "err@node_Update: " + id + ": " + err.Error() logger.Error(err_msg) diff --git a/shock-server/node/locker.go b/shock-server/node/locker.go index ada12c2e..2fb352d4 100644 --- a/shock-server/node/locker.go +++ b/shock-server/node/locker.go @@ -30,6 +30,7 @@ type NodeLock struct { func (n *NodeLock) init() { n.isLocked = false n.updated = time.Now() + n.writeLock = make(chan int, 1) n.writeLock <- 1 // Put the initial value into the channel } diff --git a/shock-server/node/node.go b/shock-server/node/node.go index bfff1344..372bc3f0 100644 --- a/shock-server/node/node.go +++ b/shock-server/node/node.go @@ -132,8 +132,9 @@ func CreateNodeUpload(u *user.User, params map[string]string, files FormFiles) ( } // update saves node - err = node.Update(params, files) + err = node.Update(params, files, true) if err != nil { + err = fmt.Errorf("(node.Update) %s", err.Error()) node.Rmdir() } diff --git a/shock-server/node/update.go b/shock-server/node/update.go index 016b50bc..3107d038 100644 --- a/shock-server/node/update.go +++ b/shock-server/node/update.go @@ -19,7 +19,7 @@ import ( ) //Modification functions -func (node *Node) Update(params map[string]string, files FormFiles) (err error) { +func (node *Node) Update(params map[string]string, files FormFiles, isNew bool) (err error) { // Exclusive conditions // 1.1. has files[upload] (regular upload) // 1.2. has files[gzip] (compressed upload) @@ -37,21 +37,26 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // global lock on a node that is being updated err = LockMgr.LockNode(node.Id) if err != nil { + err = fmt.Errorf("(LockMgr.LockNode) %s", err.Error()) return } defer LockMgr.UnlockNode(node.Id) - // refresh node state - var n *Node - n, err = Load(node.Id) - if err != nil { - return + // refresh node state if not new + if !isNew { + var n *Node + n, err = Load(node.Id) + if err != nil { + err = fmt.Errorf("(node.Load) %s", err.Error()) + return + } + node = n } - node = n for _, u := range util.ValidUpload { if _, uploadMisplaced := params[u]; uploadMisplaced { - return errors.New(fmt.Sprintf("%s form field must be file encoded", u)) + err = fmt.Errorf("form field '%s' must be file encoded", u) + return } } @@ -66,7 +71,8 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } } if uploadCount > 1 { - return errors.New("only one upload file allowed") + err = fmt.Errorf("only one upload file allowed") + return } isUrlUpload := false @@ -92,51 +98,62 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // Check exclusive conditions if isRegularUpload && (isUrlUpload || isPartialUpload || isPathUpload || isVirtualNode || isCopyUpload || isSubsetUpload) { - return errors.New("upload parameter incompatible with upload_url, parts, path, type, copy_data and/or parent_node parameter(s)") + err = errors.New("upload parameter incompatible with upload_url, parts, path, type, copy_data and/or parent_node parameter(s)") } else if isUrlUpload && (isRegularUpload || isPartialUpload || isPathUpload || isVirtualNode || isCopyUpload || isSubsetUpload) { - return errors.New("upload_url parameter incompatible with upload, parts, path, type, copy_data and/or parent_node parameter(s)") + err = errors.New("upload_url parameter incompatible with upload, parts, path, type, copy_data and/or parent_node parameter(s)") } else if isPartialUpload && (isVirtualNode || isPathUpload || isCopyUpload || isSubsetUpload) { - return errors.New("parts parameter incompatible with type, path, copy_data and/or parent_node parameter(s)") + err = errors.New("parts parameter incompatible with type, path, copy_data and/or parent_node parameter(s)") } else if isVirtualNode && (isPathUpload || isCopyUpload || isSubsetUpload) { - return errors.New("type parameter incompatible with path, copy_data and/or parent_node parameter") + err = errors.New("type parameter incompatible with path, copy_data and/or parent_node parameter") } else if isPathUpload && (isCopyUpload || isSubsetUpload) { - return errors.New("path parameter incompatible with copy_data and/or parent_node parameter") + err = errors.New("path parameter incompatible with copy_data and/or parent_node parameter") } else if isCopyUpload && isSubsetUpload { - return errors.New("copy_data parameter incompatible with parent_node parameter") + err = errors.New("copy_data parameter incompatible with parent_node parameter") } else if hasPartsFile && (isRegularUpload || isUrlUpload) { - return errors.New("parts file and upload or upload_url parameters are incompatible") + err = errors.New("parts file and upload or upload_url parameters are incompatible") } else if (node.Type == "parts") && (isRegularUpload || isUrlUpload) { - return errors.New("parts node and upload or upload_url parameters are incompatible") + err = errors.New("parts node and upload or upload_url parameters are incompatible") } else if isPartialUpload && hasPartsFile { - return errors.New("can not upload parts file when creating parts node") + err = errors.New("can not upload parts file when creating parts node") } // Check if immutable if node.HasFile() && (isRegularUpload || isUrlUpload || isPartialUpload || hasPartsFile || isVirtualNode || isPathUpload || isCopyUpload || isSubsetUpload) { - return errors.New(e.FileImut) + err = errors.New(e.FileImut) + } + + // we found an error + if err != nil { + return } + // process upload file if isRegularUpload { if err = node.SetFile(files[uploadFile]); err != nil { - return err + err = fmt.Errorf("(node.SetFile) %s", err.Error()) + return } delete(files, uploadFile) } else if isUrlUpload { if err = node.SetFile(files["upload_url"]); err != nil { - return err + err = fmt.Errorf("(node.SetFile) %s", err.Error()) + return } delete(files, "upload_url") } else if isPartialUpload { // close variable length parts if params["parts"] == "close" { if (node.Type != "parts") || (node.Parts == nil) || !node.Parts.VarLen { - return errors.New("can only call 'close' on unknown parts node") + err = errors.New("can only call 'close' on unknown / variable length parts node") + return } if err = node.closeParts(true); err != nil { - return err + err = fmt.Errorf("(node.closeParts) %s", err.Error()) + return } } else if (node.Parts != nil) && (node.Parts.VarLen || node.Parts.Count > 0) { - return errors.New("parts already set") + err = errors.New("parts already set") + return } else { // set parts struct var compressionFormat string = "" @@ -147,18 +164,22 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } if params["parts"] == "unknown" { if err = node.initParts("unknown", compressionFormat); err != nil { - return err + err = fmt.Errorf("(node.initParts) %s", err.Error()) + return } } else { - n, err := strconv.Atoi(params["parts"]) - if err != nil { - return errors.New("parts must be an integer or 'unknown'") + n, serr := strconv.Atoi(params["parts"]) + if serr != nil { + err = errors.New("parts value must be an integer or 'unknown'") + return } if n < 1 { - return errors.New("parts cannot be less than 1") + err = errors.New("parts value cannot be less than 1") + return } if err = node.initParts(params["parts"], compressionFormat); err != nil { - return err + err = fmt.Errorf("(node.initParts() %s", err.Error()) + return } } } @@ -168,38 +189,45 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) ids := strings.Split(source, ",") node.addVirtualParts(ids) } else { - return errors.New("type virtual requires source parameter") + err = errors.New("type virtual requires source parameter") + return } } else if isPathUpload { if action, hasAction := params["action"]; !hasAction || (action != "copy_file" && action != "move_file" && action != "keep_file") { - return errors.New("path upload requires action field equal to copy_file, move_file or keep_file") + err = errors.New("path upload requires action field equal to copy_file, move_file, or keep_file") + return } localpaths := strings.Split(conf.PATH_LOCAL, ",") if len(localpaths) <= 0 { - return errors.New("local files path uploads must be configured. Please contact your Shock administrator.") + err = errors.New("local files path uploads must be configured. Please contact your Shock administrator.") + return } var success = false for _, p := range localpaths { if strings.HasPrefix(params["path"], p) { if err = node.SetFileFromPath(params["path"], params["action"]); err != nil { - return err + err = fmt.Errorf("(node.SetFileFromPath) %s", err.Error()) + return } else { success = true } } } if !success { - return errors.New("file not in local files path. Please contact your Shock administrator.") + err = errors.New("file not in local files path. Please contact your Shock administrator.") + return } } else if isCopyUpload { var n *Node n, err = Load(params["copy_data"]) if err != nil { - return err + err = fmt.Errorf("(node.Load) %s", err.Error()) + return } if n.File.Virtual { - return errors.New("copy_data parameter points to a virtual node, invalid operation.") + err = errors.New("copy_data parameter points to a virtual node, invalid operation.") + return } // Copy node file information @@ -213,12 +241,13 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) node.Subset = n.Subset subsetIndexFile := n.Path() + "/" + n.Id + ".subset.idx" // The subset index file is required for creating a copy of a subset node. - if _, err := os.Stat(subsetIndexFile); err == nil { - if _, cerr := util.CopyFile(subsetIndexFile, node.Path()+"/"+node.Id+".subset.idx"); cerr != nil { - return cerr - } - } else { - return err + if _, statErr := os.Stat(subsetIndexFile); statErr != nil { + err = fmt.Errorf("(os.Stat) %s", statErr.Error()) + return + } + if _, copyErr := util.CopyFile(subsetIndexFile, node.Path()+"/"+node.Id+".subset.idx"); copyErr != nil { + err = fmt.Errorf("(util.CopyFile) %s", copyErr.Error()) + return } node.Type = "subset" } else { @@ -235,11 +264,14 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // loop through parent indexes for idxType, idxInfo := range n.Indexes { parentFile := n.IndexPath() + "/" + idxType + ".idx" - if _, err := os.Stat(parentFile); err == nil { - // copy file if exists - if _, cerr := util.CopyFile(parentFile, node.IndexPath()+"/"+idxType+".idx"); cerr != nil { - return cerr - } + if _, statErr := os.Stat(parentFile); statErr != nil { + err = fmt.Errorf("(os.Stat) %s", statErr.Error()) + return + } + // copy file if exists + if _, copyErr := util.CopyFile(parentFile, node.IndexPath()+"/"+idxType+".idx"); copyErr != nil { + err = fmt.Errorf("(util.CopyFile) %s", copyErr.Error()) + return } // copy index struct node.SetIndexInfo(idxType, idxInfo) @@ -257,40 +289,47 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) } else if isSubsetUpload { fInfo, statErr := os.Stat(files["subset_indices"].Path) if statErr != nil { - return errors.New("Could not stat uploaded subset_indices file.") + err = fmt.Errorf("(os.Stat) %s", statErr.Error()) + return } node.Type = "subset" if fInfo.Size() == 0 { // if upload file is empty, make a basic node with empty file if err = node.SetFile(files["subset_indices"]); err != nil { - return err + err = fmt.Errorf("(node.SetFile) %s", err.Error()) + return } delete(files, "subset_indices") } else { // process subset upload _, hasParentIndex := params["parent_index"] if !hasParentIndex { - return errors.New("parent_index is a required parameter for creating a subset node.") + err = errors.New("parent_index is a required parameter for creating a subset node.") + return } var n *Node n, err = Load(params["parent_node"]) if err != nil { - return err + err = fmt.Errorf("(node.Load) %s", err.Error()) + return } if n.File.Virtual { - return errors.New("parent_node parameter points to a virtual node, invalid operation.") + err = errors.New("parent_node parameter points to a virtual node, invalid operation.") + return } if _, indexExists := n.Indexes[params["parent_index"]]; !indexExists { - return errors.New("Index '" + params["parent_index"] + "' does not exist for parent node.") + err = fmt.Errorf("Index '%s' does not exist for parent node.", params["parent_index"]) + return } parentIndexFile := n.IndexPath() + "/" + params["parent_index"] + ".idx" if _, statErr := os.Stat(parentIndexFile); statErr != nil { - return errors.New("Could not stat index file for parent node where parent node = '" + params["parent_node"] + "' and index = '" + params["parent_index"] + "'.") + err = fmt.Errorf("Could not stat index file for parent node where parent node = '%s' and index = '%s'", params["parent_node"], params["parent_index"]) + return } // Copy node file information @@ -307,7 +346,8 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if _, hasSubsetList := files["subset_indices"]; hasSubsetList { if err = node.SetFileFromSubset(files["subset_indices"]); err != nil { - return err + err = fmt.Errorf("(node.SetFileFromSubset) %s", err.Error()) + return } delete(files, "subset_indices") } @@ -317,10 +357,12 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // set attributes from file if _, hasAttr := files["attributes"]; hasAttr { if _, hasAttrStr := params["attributes_str"]; hasAttrStr { - return errors.New("Cannot define an attributes file and an attributes_str parameter in the same request.") + err = errors.New("Cannot define an attributes file and an attributes_str parameter in the same request.") + return } if err = node.SetAttributes(files["attributes"]); err != nil { - return err + err = fmt.Errorf("(node.SetAttributes) %s", err.Error()) + return } delete(files, "attributes") } @@ -328,10 +370,12 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // set attributes from json string if _, hasAttrStr := params["attributes_str"]; hasAttrStr { if _, hasAttr := files["attributes"]; hasAttr { - return errors.New("Cannot define an attributes file and an attributes_str parameter in the same request.") + err = errors.New("Cannot define an attributes file and an attributes_str parameter in the same request.") + return } if err = node.SetAttributesFromString(params["attributes_str"]); err != nil { - return err + err = fmt.Errorf("(node.SetAttributesFromString) %s", err.Error()) + return } } @@ -346,14 +390,16 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) if ltype == "parent" { if node.HasParent() { - return errors.New(e.ProvenanceImut) + err = errors.New(e.ProvenanceImut) + return } } var ids string if _, hasIds := params["ids"]; hasIds { ids = params["ids"] } else { - return errors.New("missing ids for updating relatives") + err = errors.New("missing ids for updating relatives") + return } var operation string if _, hasOp := params["operation"]; hasOp { @@ -370,16 +416,18 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // update file format if _, hasFormat := params["format"]; hasFormat { if node.File.Format != "" { - return errors.New(fmt.Sprintf("file format already set:%s", node.File.Format)) + err = fmt.Errorf("file format already set: %s", node.File.Format) + return } node.File.Format = params["format"] } // update priority if _, hasPriority := params["priority"]; hasPriority { - priority, err := strconv.Atoi(params["priority"]) - if err != nil { - return errors.New("priority must be an integer") + priority, serr := strconv.Atoi(params["priority"]) + if serr != nil { + err = errors.New("priority must be an integer") + return } node.Priority = priority } @@ -387,7 +435,8 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // update node expiration if _, hasExpiration := params["expiration"]; hasExpiration { if err = node.SetExpiration(params["expiration"]); err != nil { - return err + err = fmt.Errorf("(node.SetExpiration) %s", err.Error()) + return } } if _, hasRemove := params["remove_expiration"]; hasRemove { @@ -404,10 +453,12 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) // handle part file if hasPartsFile { if node.HasFile() { - return errors.New(e.FileImut) + err = errors.New(e.FileImut) + return } if (node.Type != "parts") || (node.Parts == nil) { - return errors.New("This is not a parts node and thus does not support uploading in parts.") + err = errors.New("This is not a parts node and thus does not support uploading in parts.") + return } if node.Parts.Count > 0 || node.Parts.VarLen { @@ -415,23 +466,28 @@ func (node *Node) Update(params map[string]string, files FormFiles) (err error) keyn, errf := strconv.Atoi(key) if errf == nil && (keyn <= node.Parts.Count || node.Parts.VarLen) { if err = node.addPart(keyn-1, &file); err != nil { - return err + err = fmt.Errorf("(node.addPart) %s", err.Error()) + return } } } } else { - return errors.New("Unable to retrieve parts info for node.") + err = errors.New("Unable to retrieve parts info for node.") + return } // all parts are in, close it if !node.Parts.VarLen && node.Parts.Length == node.Parts.Count { if err = node.closeParts(false); err != nil { - return err + err = fmt.Errorf("(node.closeParts) %s", err.Error()) + return } } } // save only after all updates applied - err = node.Save() + if err = node.Save(); err != nil { + err = fmt.Errorf("(node.Save) %s", err.Error()) + } return } @@ -460,27 +516,32 @@ func (node *Node) Save() (err error) { node.LastModified = time.Now() } // get bson, test size and print - nbson, err := bson.Marshal(node) - if err != nil { - return err + nbson, merr := bson.Marshal(node) + if merr != nil { + err = fmt.Errorf("(bson.Marshal) %s", err.Error()) + return } if len(nbson) >= DocumentMaxByte { - return errors.New(fmt.Sprintf("bson document size is greater than limit of %d bytes", DocumentMaxByte)) + err = fmt.Errorf("bson document size is greater than limit of %d bytes", DocumentMaxByte) + return } bsonPath := fmt.Sprintf("%s/%s.bson", node.Path(), node.Id) os.Remove(bsonPath) - if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { + if err = ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { // dir path may be missing, recreate and try again - if err := node.Mkdir(); err != nil { - return err + if err = node.Mkdir(); err != nil { + err = fmt.Errorf("(node.Mkdir) %s", err.Error()) + return } - if err := ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { - return err + if err = ioutil.WriteFile(bsonPath, nbson, 0644); err != nil { + err = fmt.Errorf("(ioutil.WriteFile) %s", err.Error()) + return } } // save node to mongodb - if err := dbUpsert(node); err != nil { - return err + if err = dbUpsert(node); err != nil { + err = fmt.Errorf("(node.dbUpsert) %s", err.Error()) + return } return } @@ -499,8 +560,9 @@ func (node *Node) UpdateVersion() (err error) { sort.Strings(partKeys) for _, k := range partKeys { - j, er := json.Marshal(partMap[k]) - if er != nil { + j, jerr := json.Marshal(partMap[k]) + if jerr != nil { + err = fmt.Errorf("(json.Marshal) %s", err.Error()) return } // need to sort bytes to deal with unordered json