From 3c2063e6967babc0e946ed696b513a2d6b40531b Mon Sep 17 00:00:00 2001 From: frikky Date: Thu, 25 Mar 2021 19:51:35 +0100 Subject: [PATCH] Minor fixes for cache udates --- db-connector.go | 116 +++++++++++++++++++++++++++++++++++++++--------- shared.go | 72 +++++++++++++----------------- 2 files changed, 126 insertions(+), 62 deletions(-) diff --git a/db-connector.go b/db-connector.go index f9024d2..4fdb914 100644 --- a/db-connector.go +++ b/db-connector.go @@ -18,6 +18,10 @@ import ( var err error var requestCache *cache.Cache +var maxCacheSize = 1020000 + +//var maxCacheSize = 2000000 + // Cache handlers func DeleteCache(ctx context.Context, name string) error { if project.Environment == "cloud" { @@ -29,7 +33,7 @@ func DeleteCache(ctx context.Context, name string) error { return errors.New(fmt.Sprintf("No cache handler for environment %s yet WHILE DELETING", project.Environment)) } - return errors.New(fmt.Sprintf("No cache found for %s", name)) + return errors.New(fmt.Sprintf("No cache found for %s when DELETING cache", name)) } // Cache handlers @@ -39,7 +43,32 @@ func GetCache(ctx context.Context, name string) (interface{}, error) { } else if err != nil { return "", errors.New(fmt.Sprintf("Failed getting CLOUD cache for %s: %s", name, err)) } else { - return item.Value, nil + // Loops if cachesize is more than max allowed in memcache (multikey) + if len(item.Value) == maxCacheSize { + totalData := item.Value + keyCount := 1 + keyname := fmt.Sprintf("%s_%d", name, keyCount) + for { + if item, err := memcache.Get(ctx, keyname); err == memcache.ErrCacheMiss { + break + } else { + totalData = append(totalData, item.Value...) + + //log.Printf("%d - %d = ", len(item.Value), maxCacheSize) + if len(item.Value) != maxCacheSize { + break + } + } + + keyCount += 1 + keyname = fmt.Sprintf("%s_%d", name, keyCount) + } + + log.Printf("[INFO] CACHE: TOTAL SIZE FOR %s: %d", name, len(totalData)) + return totalData, nil + } else { + return item.Value, nil + } } } else if project.Environment == "onprem" { //log.Printf("[INFO] GETTING CACHE FOR %s ONPREM", name) @@ -60,23 +89,66 @@ func SetCache(ctx context.Context, name string, data []byte) error { // Maxsize ish~ if project.Environment == "cloud" { - maxSize := 1020000 + if len(data) > maxCacheSize*10 { + return errors.New(fmt.Sprintf("Couldn't set cache for %s - too large: %d > %d", name, len(data), maxCacheSize*10)) + } loop := false - if len(data) > maxSize { + if len(data) > maxCacheSize { loop = true //log.Printf("Should make multiple cache items for %s", name) - return errors.New(fmt.Sprintf("Couldn't set cache for %s - too large: %d > %d", name, len(data), maxSize)) } - _ = loop - item := &memcache.Item{ - Key: name, - Value: data, - Expiration: time.Minute * 30, - } + // Custom for larger sizes. Max is maxSize*10 when being set + if loop { + currentChunk := 0 + keyAmount := 0 + totalAdded := 0 + chunkSize := maxCacheSize + nextStep := chunkSize + keyname := name + + for { + if len(data) < nextStep { + nextStep = len(data) + } - if err := memcache.Set(ctx, item); err != nil { - log.Printf("[WARNING] Failed setting cache for %s: %s", name, err) + //log.Printf("%d - %d = ", currentChunk, nextStep) + parsedData := data[currentChunk:nextStep] + item := &memcache.Item{ + Key: keyname, + Value: parsedData, + Expiration: time.Minute * 30, + } + + if err := memcache.Set(ctx, item); err != nil { + log.Printf("[WARNING] Failed setting cache for %s: %s", keyname, err) + break + } else { + totalAdded += chunkSize + currentChunk = nextStep + nextStep += chunkSize + + keyAmount += 1 + //log.Printf("%s: %d: %d", keyname, totalAdded, len(data)) + + keyname = fmt.Sprintf("%s_%d", name, keyAmount) + if totalAdded > len(data) { + break + } + } + } + + log.Printf("[INFO] Set app cache with length %d and %d keys", len(data), keyAmount) + } else { + item := &memcache.Item{ + Key: name, + Value: data, + Expiration: time.Minute * 30, + } + + if err := memcache.Set(ctx, item); err != nil { + log.Printf("[WARNING] Failed setting cache for %s: %s", name, err) + } } return nil @@ -117,21 +189,21 @@ func SetWorkflowAppDatastore(ctx context.Context, workflowapp WorkflowApp, id st func SetWorkflowExecution(ctx context.Context, workflowExecution WorkflowExecution, dbSave bool) error { //log.Printf("\n\n\nRESULT: %s\n\n\n", workflowExecution.Status) if len(workflowExecution.ExecutionId) == 0 { - log.Printf("Workflowexeciton executionId can't be empty.") + log.Printf("[WARNING] Workflowexeciton executionId can't be empty.") return errors.New("ExecutionId can't be empty.") } cacheKey := fmt.Sprintf("workflowexecution-%s", workflowExecution.ExecutionId) executionData, err := json.Marshal(workflowExecution) if err != nil { - log.Printf("[WARNING] Failed marshalling execution: %s", err) + log.Printf("[WARNING] Failed marshalling execution for cache: %s", err) err = SetCache(ctx, cacheKey, executionData) if err != nil { - log.Printf("[WARNING] Failed updating execution: %s", err) + log.Printf("[WARNING] Failed updating execution cache: %s", err) } } else { - log.Printf("[WARNING] Failed to set execution cache for workflow.") + log.Printf("[INFO] Set execution cache for workflowexecution %s", cacheKey) } //requestCache.Set(cacheKey, &workflowExecution, cache.DefaultExpiration) @@ -238,13 +310,13 @@ func SetInitExecutionVariables(ctx context.Context, workflowExecution WorkflowEx if sourceFound { parents[branch.DestinationID] = append(parents[branch.DestinationID], branch.SourceID) } else { - log.Printf("ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID) + log.Printf("[INFO] ID %s was not found in actions! Skipping parent. (TRIGGER?)", branch.SourceID) } if destinationFound { children[branch.SourceID] = append(children[branch.SourceID], branch.DestinationID) } else { - log.Printf("ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID) + log.Printf("[INFO] ID %s was not found in actions! Skipping child. (TRIGGER?)", branch.SourceID) } } @@ -775,6 +847,10 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) { err = json.Unmarshal(cacheData, &allApps) if err == nil { return allApps, nil + } else { + log.Println(string(cacheData)) + log.Printf("Failed unmarshaling apps: %s", err) + log.Printf("DATALEN: %d", len(cacheData)) } } else { log.Printf("[INFO] Failed getting cache for apps with KEY %s: %s", cacheKey, err) @@ -798,7 +874,7 @@ func GetPrioritizedApps(ctx context.Context, user User) ([]WorkflowApp, error) { continue } - log.Printf("[WARNING] No more apps (org)? Breaking: %s.", err) + log.Printf("[WARNING] No more apps for %s in org app load? Breaking: %s.", user.Username, err) break } diff --git a/shared.go b/shared.go index efa9c41..76b4935 100644 --- a/shared.go +++ b/shared.go @@ -836,7 +836,7 @@ func HandleApiAuthentication(resp http.ResponseWriter, request *http.Request) (U sessionToken := c.Value session, err := GetSession(ctx, sessionToken) if err != nil { - log.Printf("Session %s doesn't exist: %s", session.Session, err) + log.Printf("[INFO] Session %s doesn't exist: %s", session.Session, err) return User{}, err } @@ -1258,6 +1258,8 @@ func GetWorkflowExecutions(resp http.ResponseWriter, request *http.Request) { return } } else if strings.Contains(fmt.Sprintf("%s", err), "FailedPrecondition") { + //log.Printf("[INFO] Failed precondition in workflowexecs: %s", err) + q = datastore.NewQuery("workflowexecution").Filter("workflow_id =", fileId).Limit(25) _, err = project.Dbclient.GetAll(ctx, q, &workflowExecutions) if err != nil { @@ -1309,20 +1311,18 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) { var workflows []Workflow cacheKey := fmt.Sprintf("%s_workflows", user.Id) - if project.CacheDb { - cache, err := GetCache(ctx, cacheKey) + cache, err := GetCache(ctx, cacheKey) + if err == nil { + cacheData := []byte(cache.([]uint8)) + //log.Printf("CACHEDATA: %#v", cacheData) + err = json.Unmarshal(cacheData, &workflows) if err == nil { - cacheData := []byte(cache.([]uint8)) - //log.Printf("CACHEDATA: %#v", cacheData) - err = json.Unmarshal(cacheData, &workflows) - if err == nil { - resp.WriteHeader(200) - resp.Write(cacheData) - return - } - } else { - log.Printf("[INFO] Failed getting cache for workflows for user %s", user.Id) + resp.WriteHeader(200) + resp.Write(cacheData) + return } + } else { + //log.Printf("[INFO] Failed getting cache for workflows for user %s", user.Id) } // With user, do a search for workflows with user or user's org attached @@ -1411,7 +1411,7 @@ func GetWorkflows(resp http.ResponseWriter, request *http.Request) { if project.CacheDb { err = SetCache(ctx, cacheKey, newjson) if err != nil { - log.Printf("[WARNING] Failed updating workflows: %s", err) + log.Printf("[WARNING] Failed updating workflow cache: %s", err) } } @@ -1933,15 +1933,17 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) { } // Initialized without functions = adding a hello world node. - log.Printf("WHATS TAKING TIME 1") if len(newActions) == 0 { //log.Printf("APPENDING NEW APP FOR NEW WORKFLOW") // Adds the Testing app if it's a new workflow workflowapps, err := GetPrioritizedApps(ctx, user) if err == nil { - // FIXME: Add real env envName := "Shuffle" + if project.Environment == "cloud" { + envName = "cloud" + } + environments, err := GetEnvironments(ctx, user.ActiveOrg.Id) if err == nil { for _, env := range environments { @@ -1996,7 +1998,6 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) { //} } - log.Printf("WHATS TAKING TIME 2") workflow.Actions = []Action{} for _, item := range workflow.Actions { oldId := item.ID @@ -2066,7 +2067,6 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) { workflow.Configuration.ExitOnError = false workflow.Created = timeNow - log.Printf("WHATS TAKING TIME 3") workflowjson, err := json.Marshal(workflow) if err != nil { log.Printf("Failed workflow json setting marshalling: %s", err) @@ -2083,7 +2083,6 @@ func SetNewWorkflow(resp http.ResponseWriter, request *http.Request) { return } - log.Printf("WHATS TAKING TIME 4") cacheKey := fmt.Sprintf("%s_workflows", user.Id) DeleteCache(ctx, cacheKey) @@ -2302,6 +2301,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) { } } else if trigger.TriggerType == "USERINPUT" { // E.g. check email + log.Printf("Validating USERINPUT") sms := "" email := "" triggerType := "" @@ -2439,20 +2439,6 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) { cacheKey := fmt.Sprintf("%s_workflows", user.Id) DeleteCache(ctx, cacheKey) } - - // FIXME - more checks here - force reload of data or something - //if len(allNodes) == 0 { - // resp.WriteHeader(401) - // resp.Write([]byte(`{"success": false, "reason": "Please insert a node"}`)) - // return - //} - - // Allowed with only a start node - //if len(allNodes) != 1 { - // resp.WriteHeader(401) - // resp.Write([]byte(`{"success": false, "reason": "There are nodes with no branches"}`)) - // return - //} } // FIXME - might be a sploit to run someone elses app if getAllWorkflowApps @@ -2484,15 +2470,17 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) { } } - allAuths, err := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id) - if userErr != nil { - log.Printf("Api authentication failed in get all apps: %s", userErr) - if workflow.PreviouslySaved { - resp.WriteHeader(401) - resp.Write([]byte(`{"success": false}`)) - return + /* + allAuths, err := GetAllWorkflowAppAuth(ctx, user.ActiveOrg.Id) + if userErr != nil { + log.Printf("Api authentication failed in get all apps: %s", userErr) + if workflow.PreviouslySaved { + resp.WriteHeader(401) + resp.Write([]byte(`{"success": false}`)) + return + } } - } + */ // Check every app action and param to see whether they exist //log.Printf("PRE ACTIONS 2") @@ -2810,7 +2798,7 @@ func SaveWorkflow(resp http.ResponseWriter, request *http.Request) { workflow.Actions = newActions workflow.IsValid = true - log.Printf("[INFO] Tags: %#v", workflow.Tags) + //log.Printf("[INFO] Tags: %#v", workflow.Tags) // FIXME: Is this too drastic? May lead to issues in the future. // Should maybe make a copy for the old org.