Skip to content

Commit

Permalink
Minor fixes for cache udates
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Mar 25, 2021
1 parent 9604a4b commit 3c2063e
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 62 deletions.
116 changes: 96 additions & 20 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
var err error
var requestCache *cache.Cache

var maxCacheSize = 1020000

//var maxCacheSize = 2000000

// Cache handlers
func DeleteCache(ctx context.Context, name string) error {
if project.Environment == "cloud" {
Expand All @@ -29,7 +33,7 @@ func DeleteCache(ctx context.Context, name string) error {
return errors.New(fmt.Sprintf("No cache handler for environment %s yet WHILE DELETING", project.Environment))
}

return errors.New(fmt.Sprintf("No cache found for %s", name))
return errors.New(fmt.Sprintf("No cache found for %s when DELETING cache", name))
}

// Cache handlers
Expand All @@ -39,7 +43,32 @@ func GetCache(ctx context.Context, name string) (interface{}, error) {
} else if err != nil {
return "", errors.New(fmt.Sprintf("Failed getting CLOUD cache for %s: %s", name, err))
} else {
return item.Value, nil
// Loops if cachesize is more than max allowed in memcache (multikey)
if len(item.Value) == maxCacheSize {
totalData := item.Value
keyCount := 1
keyname := fmt.Sprintf("%s_%d", name, keyCount)
for {
if item, err := memcache.Get(ctx, keyname); err == memcache.ErrCacheMiss {
break
} else {
totalData = append(totalData, item.Value...)

//log.Printf("%d - %d = ", len(item.Value), maxCacheSize)
if len(item.Value) != maxCacheSize {
break
}
}

keyCount += 1
keyname = fmt.Sprintf("%s_%d", name, keyCount)
}

log.Printf("[INFO] CACHE: TOTAL SIZE FOR %s: %d", name, len(totalData))
return totalData, nil
} else {
return item.Value, nil
}
}
} else if project.Environment == "onprem" {
//log.Printf("[INFO] GETTING CACHE FOR %s ONPREM", name)
Expand All @@ -60,23 +89,66 @@ func SetCache(ctx context.Context, name string, data []byte) error {
// Maxsize ish~

if project.Environment == "cloud" {
maxSize := 1020000
if len(data) > maxCacheSize*10 {
return errors.New(fmt.Sprintf("Couldn't set cache for %s - too large: %d > %d", name, len(data), maxCacheSize*10))
}
loop := false
if len(data) > maxSize {
if len(data) > maxCacheSize {
loop = true
//log.Printf("Should make multiple cache items for %s", name)
return errors.New(fmt.Sprintf("Couldn't set cache for %s - too large: %d > %d", name, len(data), maxSize))
}
_ = loop

item := &memcache.Item{
Key: name,
Value: data,
Expiration: time.Minute * 30,
}
// Custom for larger sizes. Max is maxSize*10 when being set
if loop {
currentChunk := 0
keyAmount := 0
totalAdded := 0
chunkSize := maxCacheSize
nextStep := chunkSize
keyname := name

for {
if len(data) < nextStep {
nextStep = len(data)
}

if err := memcache.Set(ctx, item); err != nil {
log.Printf("[WARNING] Failed setting cache for %s: %s", name, err)
//log.Printf("%d - %d = ", currentChunk, nextStep)
parsedData := data[currentChunk:nextStep]
item := &memcache.Item{
Key: keyname,
Value: parsedData,
Expiration: time.Minute * 30,
}

if err := memcache.Set(ctx, item); err != nil {
log.Printf("[WARNING] Failed setting cache for %s: %s", keyname, err)
break
} else {
totalAdded += chunkSize
currentChunk = nextStep
nextStep += chunkSize

keyAmount += 1
//log.Printf("%s: %d: %d", keyname, totalAdded, len(data))

keyname = fmt.Sprintf("%s_%d", name, keyAmount)
if totalAdded > len(data) {
break
}
}
}

log.Printf("[INFO] Set app cache with length %d and %d keys", len(data), keyAmount)
} else {
item := &memcache.Item{
Key: name,
Value: data,
Expiration: time.Minute * 30,
}

if err := memcache.Set(ctx, item); err != nil {
log.Printf("[WARNING] Failed setting cache for %s: %s", name, err)
}
}

return nil
Expand Down Expand Up @@ -117,21 +189,21 @@ func SetWorkflowAppDatastore(ctx context.Context, workflowapp WorkflowApp, id st
func SetWorkflowExecution(ctx context.Context, workflowExecution WorkflowExecution, dbSave bool) error {
//log.Printf("\n\n\nRESULT: %s\n\n\n", workflowExecution.Status)
if len(workflowExecution.ExecutionId) == 0 {
log.Printf("Workflowexeciton executionId can't be empty.")
log.Printf("[WARNING] Workflowexeciton executionId can't be empty.")
return errors.New("ExecutionId can't be empty.")
}

cacheKey := fmt.Sprintf("workflowexecution-%s", workflowExecution.ExecutionId)
executionData, err := json.Marshal(workflowExecution)
if err != nil {
log.Printf("[WARNING] Failed marshalling execution: %s", err)
log.Printf("[WARNING] Failed marshalling execution for cache: %s", err)

err = SetCache(ctx, cacheKey, executionData)
if err != nil {
log.Printf("[WARNING] Failed updating execution: %s", err)
log.Printf("[WARNING] Failed updating execution cache: %s", err)
}
} else {
log.Printf("[WARNING] Failed to set execution cache for workflow.")
log.Printf("[INFO] Set execution cache for workflowexecution %s", cacheKey)
}

//requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration)
Expand Down Expand Up @@ -238,13 +310,13 @@ func SetInitExecutionVariables(ctx context.Context, workflowExecution WorkflowEx
if sourceFound {
parents[branch.DestinationID] = append(parents[branch.DestinationID], branch.SourceID)
} else {
log.Printf("ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID)
log.Printf("[INFO] ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID)
}

if destinationFound {
children[branch.SourceID] = append(children[branch.SourceID], branch.DestinationID)
} else {
log.Printf("ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID)
log.Printf("[INFO] ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID)
}
}

Expand Down Expand Up @@ -775,6 +847,10 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) {
err = json.Unmarshal(cacheData, &allApps)
if err == nil {
return allApps, nil
} else {
log.Println(string(cacheData))
log.Printf("Failed unmarshaling apps: %s", err)
log.Printf("DATALEN: %d", len(cacheData))
}
} else {
log.Printf("[INFO] Failed getting cache for apps with KEY %s: %s", cacheKey, err)
Expand All @@ -798,7 +874,7 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) {
continue
}

log.Printf("[WARNING] No more apps (org)? Breaking: %s.", err)
log.Printf("[WARNING] No more apps for %s in org app load? Breaking: %s.", user.Username, err)
break
}

Expand Down
72 changes: 30 additions & 42 deletions shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func HandleApiAuthentication(resp http.ResponseWriter, request *http.Request) (U
sessionToken := c.Value
session, err := GetSession(ctx, sessionToken)
if err != nil {
log.Printf("Session %s doesn't exist: %s", session.Session, err)
log.Printf("[INFO] Session %s doesn't exist: %s", session.Session, err)
return User{}, err
}

Expand Down Expand Up @@ -1258,6 +1258,8 @@ func GetWorkflowExecutions(resp http.ResponseWriter, request *http.Request) {
return
}
} else if strings.Contains(fmt.Sprintf("%s", err), "FailedPrecondition") {
//log.Printf("[INFO] Failed precondition in workflowexecs: %s", err)

q = datastore.NewQuery("workflowexecution").Filter("workflow_id =", fileId).Limit(25)
_, err = project.Dbclient.GetAll(ctx, q, &workflowExecutions)
if err != nil {
Expand Down Expand Up @@ -1309,20 +1311,18 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) {
var workflows []Workflow

cacheKey := fmt.Sprintf("%s_workflows", user.Id)
if project.CacheDb {
cache, err := GetCache(ctx, cacheKey)
cache, err := GetCache(ctx, cacheKey)
if err == nil {
cacheData := []byte(cache.([]uint8))
//log.Printf("CACHEDATA: %#v", cacheData)
err = json.Unmarshal(cacheData, &workflows)
if err == nil {
cacheData := []byte(cache.([]uint8))
//log.Printf("CACHEDATA: %#v", cacheData)
err = json.Unmarshal(cacheData, &workflows)
if err == nil {
resp.WriteHeader(200)
resp.Write(cacheData)
return
}
} else {
log.Printf("[INFO] Failed getting cache for workflows for user %s", user.Id)
resp.WriteHeader(200)
resp.Write(cacheData)
return
}
} else {
//log.Printf("[INFO] Failed getting cache for workflows for user %s", user.Id)
}

// With user, do a search for workflows with user or user's org attached
Expand Down Expand Up @@ -1411,7 +1411,7 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) {
if project.CacheDb {
err = SetCache(ctx, cacheKey, newjson)
if err != nil {
log.Printf("[WARNING] Failed updating workflows: %s", err)
log.Printf("[WARNING] Failed updating workflow cache: %s", err)
}
}

Expand Down Expand Up @@ -1933,15 +1933,17 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) {
}

// Initialized without functions = adding a hello world node.
log.Printf("WHATS TAKING TIME 1")
if len(newActions) == 0 {
//log.Printf("APPENDING NEW APP FOR NEW WORKFLOW")

// Adds the Testing app if it's a new workflow
workflowapps, err := GetPrioritizedApps(ctx, user)
if err == nil {
// FIXME: Add real env
envName := "Shuffle"
if project.Environment == "cloud" {
envName = "cloud"
}

environments, err := GetEnvironments(ctx, user.ActiveOrg.Id)
if err == nil {
for _, env := range environments {
Expand Down Expand Up @@ -1996,7 +1998,6 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) {
//}
}

log.Printf("WHATS TAKING TIME 2")
workflow.Actions = []Action{}
for _, item := range workflow.Actions {
oldId := item.ID
Expand Down Expand Up @@ -2066,7 +2067,6 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) {
workflow.Configuration.ExitOnError = false
workflow.Created = timeNow

log.Printf("WHATS TAKING TIME 3")
workflowjson, err := json.Marshal(workflow)
if err != nil {
log.Printf("Failed workflow json setting marshalling: %s", err)
Expand All @@ -2083,7 +2083,6 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) {
return
}

log.Printf("WHATS TAKING TIME 4")
cacheKey := fmt.Sprintf("%s_workflows", user.Id)
DeleteCache(ctx, cacheKey)

Expand Down Expand Up @@ -2302,6 +2301,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}
} else if trigger.TriggerType == "USERINPUT" {
// E.g. check email
log.Printf("Validating USERINPUT")
sms := ""
email := ""
triggerType := ""
Expand Down Expand Up @@ -2439,20 +2439,6 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
cacheKey := fmt.Sprintf("%s_workflows", user.Id)
DeleteCache(ctx, cacheKey)
}

// FIXME - more checks here - force reload of data or something
//if len(allNodes) == 0 {
// resp.WriteHeader(401)
// resp.Write([]byte(`{"success": false, "reason": "Please insert a node"}`))
// return
//}

// Allowed with only a start node
//if len(allNodes) != 1 {
// resp.WriteHeader(401)
// resp.Write([]byte(`{"success": false, "reason": "There are nodes with no branches"}`))
// return
//}
}

// FIXME - might be a sploit to run someone elses app if getAllWorkflowApps
Expand Down Expand Up @@ -2484,15 +2470,17 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {
}
}

allAuths, err := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id)
if userErr != nil {
log.Printf("Api authentication failed in get all apps: %s", userErr)
if workflow.PreviouslySaved {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
/*
allAuths, err := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id)
if userErr != nil {
log.Printf("Api authentication failed in get all apps: %s", userErr)
if workflow.PreviouslySaved {
resp.WriteHeader(401)
resp.Write([]byte(`{"success": false}`))
return
}
}
}
*/

// Check every app action and param to see whether they exist
//log.Printf("PRE ACTIONS 2")
Expand Down Expand Up @@ -2810,7 +2798,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) {

workflow.Actions = newActions
workflow.IsValid = true
log.Printf("[INFO] Tags: %#v", workflow.Tags)
//log.Printf("[INFO] Tags: %#v", workflow.Tags)

// FIXME: Is this too drastic? May lead to issues in the future.
// Should maybe make a copy for the old org.
Expand Down

0 comments on commit 3c2063e

Please sign in to comment.