diff --git a/.github/configs/wordlist.txt b/.github/configs/wordlist.txt index 0db0a3eae..721224adc 100644 --- a/.github/configs/wordlist.txt +++ b/.github/configs/wordlist.txt @@ -23,6 +23,7 @@ workerNodeNum durations dur ACM +Acyclic addr adservice AdService @@ -151,8 +152,10 @@ Ctrl currencyservice Daglis DAGMode +DAGEntryFunction datacenter Datacenter +DAGs dataflows dataset david @@ -710,6 +713,7 @@ cgroups noop YAMLs cgo +EnableDAGDataset EnableMetricsScrapping EnableZipkinTracing EndpointPort diff --git a/cmd/config_knative_trace.json b/cmd/config_knative_trace.json index 661640fc4..4a39796fb 100644 --- a/cmd/config_knative_trace.json +++ b/cmd/config_knative_trace.json @@ -24,5 +24,9 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false + "DAGMode": false, + "DAGEntryFunction": 0, + "EnableDAGDataset": true, + "Width": 2, + "Depth": 2 } diff --git a/cmd/loader.go b/cmd/loader.go index bb69b80c5..1b8bfa9b8 100644 --- a/cmd/loader.go +++ b/cmd/loader.go @@ -31,11 +31,12 @@ import ( "strings" "time" + "golang.org/x/exp/slices" + "github.com/vhive-serverless/loader/pkg/common" "github.com/vhive-serverless/loader/pkg/config" "github.com/vhive-serverless/loader/pkg/driver" "github.com/vhive-serverless/loader/pkg/trace" - "golang.org/x/exp/slices" log "github.com/sirupsen/logrus" tracer "github.com/vhive-serverless/vSwarm/utils/tracing/go" diff --git a/data/traces/example/dag_structure.csv b/data/traces/example/dag_structure.csv new file mode 100644 index 000000000..0b97432cf --- /dev/null +++ b/data/traces/example/dag_structure.csv @@ -0,0 +1,6 @@ +Width,Width - Percentile,Depth,Depth - Percentile,Total Nodes,Total Nodes +1,0.00%,1,0.00%,2,0.00% +1,78.66%,1,12.51%,2,55.98% +2,92.13%,2,67.96%,3,79.20% +3,95.24%,3,86.84%,4,86.63% +4,100.00%,4,100.00%,5,100.00% diff --git a/data/traces/reference/.gitattributes b/data/traces/reference/.gitattributes index f087b429e..75a4a2e5d 100644 --- a/data/traces/reference/.gitattributes +++ b/data/traces/reference/.gitattributes @@ -1 +1,2 @@ *.tar.gz filter=lfs diff=lfs merge=lfs -text +dag_structure.csv filter=lfs diff=lfs merge=lfs -text diff --git a/data/traces/reference/dag_structure.csv b/data/traces/reference/dag_structure.csv new file mode 100644 index 000000000..ae5935843 --- /dev/null +++ b/data/traces/reference/dag_structure.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:664d4d4a1710c292fdf468bc3872653c39f21c97a89e73af5488c4a34c17eb10 +size 76212 diff --git a/docs/configuration.md b/docs/configuration.md index ceb9f31fb..f99b50740 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -30,7 +30,11 @@ | MetricScrapingPeriodSeconds | int | > 0 | 15 | Period of Prometheus metrics scrapping | | GRPCConnectionTimeoutSeconds | int | > 0 | 60 | Timeout for establishing a gRPC connection | | GRPCFunctionTimeoutSeconds | int | > 0 | 90 | Maximum time given to function to execute[^5] | -| DAGMode | bool | true/false | false | Sequential invocation of all functions one after another | +| DAGMode | bool | true/false | false | Generates a dedicated DAG workflow[^6] with a specified function as the entry point. Frequency and IAT of the DAG follows the entry function, while Duration and Memory of each function will follow their respective values in TracePath. | +| DAGEntryFunction | int | >=0 | 0 | The index of the entry function in the DAG. Frequency of the DAG will follow this function. | +| EnableDAGDataset | bool | true/false | true | Generate width and depth from data/traces/example/dag_structure.csv[^7] | +| Width | int | > 0 | 2 | Default width of DAG | +| Depth | int | > 0 | 2 | Default depth of DAG | [^1]: To run RPS experiments add suffix `-RPS`. @@ -50,6 +54,10 @@ this [table](https://cloud.google.com/functions/pricing#compute_time) for Google [^5]: Function can execute for at most 15 minutes as in AWS Lambda; https://aws.amazon.com/about-aws/whats-new/2018/10/aws-lambda-supports-functions-that-can-run-up-to-15-minutes/ +[^6]: The generated DAG consists of unique functions. The shape of each DAG is determined either ```Width,Depth``` or randomly generated based on ```EnableDAGDAtaset```. + +[^7]: A [data sample](https://github.com/icanforce/Orion-OSDI22/blob/main/Public_Dataset/dag_structure.xlsx) of DAG structures has been created based on past Microsoft Azure traces. Width and Depth are determined based on probabilities of this sample. + --- InVitro can cause failure on cluster manager components. To do so, please configure the `cmd/failure.json`. Make sure diff --git a/docs/loader.md b/docs/loader.md index a5330238f..ec922529b 100644 --- a/docs/loader.md +++ b/docs/loader.md @@ -179,6 +179,44 @@ For more options, please see the `Makefile`. For instructions on how to use the loader with OpenWhisk go to `openwhisk_setup/README.md`. +## Workflow Invocation +Generation of a Directed Acyclic Graph (DAG) workflow is supported by setting `"DAGMode: true"` in `cmd/config_knative_trace.json` (as specified in [`docs/configuration.md`](../docs/configuration.md)). Before invocation, a single DAG containing unique functions will be generated, with entry function index in `"DAGEntryFunction"`. The invocation frequency of the DAG will follow the specified entry function in `data/traces/example/invocations.csv`. + +The DAG consists of functions, linked in numerical order, until the desired shape is achieved. If the number of functions provided in `data/traces/example` exceed the required number for DAG generation, the excess functions will be ignored. + + An example of the generated workflow can be seen here: + +DAG of Width = 3, Depth = 4 with `DAGEntryFunction`: 0: + +```bash +f(0) -> f(1) -> f(3) -> f(5) + \ + \ -> f(2) -> f(4) -> f(6) + \ + \ -> f(7) +``` +In this DAG, each invocation will result in 8 total functions invoked, with parallel invocations per branch. Functions not in the DAG will not be invoked. + +To obtain [reference traces](https://github.com/vhive-serverless/invitro/blob/main/docs/sampler.md#reference-traces) for DAG execution, use the following command: +```bash +git lfs pull +tar -xzf data/traces/reference/sampled_150.tar.gz -C data/traces +``` +Microsoft has publicly released Microsoft Azure traces of function invocations from 10/18/2021 to 10/31/2021. From this trace, a [data sample](https://github.com/icanforce/Orion-OSDI22/blob/main/Public_Dataset) of DAG structures, representing the cumulative distribution of width and depth of DAGs during that period, was generated. Probabilities were applied to the data to derive the shape of the DAGs. The file `data/traces/example/dag_structure.csv` provides a simplified sample of the publicly released traces. + +By default, the shape of the DAG is automatically calculated using the above mentioned cumulative distribution. +To manually set the shape of the DAG, change the following parameters in `cmd/config_knative_trace.json`. Note that the number of functions in `TracePath` must be large enough to support the maximum size of the DAG. +```bash +"EnableDAGDataset": false, +"Width": , +"Depth": +``` + +Lastly, start the experiment. This invokes the DAG with the entry function's invocation frequency. +```bash +go run cmd/loader.go --config cmd/config_knative_trace.json +``` + ## Running on Cloud Using Serverless Framework **Currently supported vendors:** AWS @@ -204,7 +242,6 @@ For instructions on how to use the loader with OpenWhisk go to `openwhisk_setup/ ```bash go run cmd/loader.go --config cmd/config_knative_trace.json ``` - --- Note: - Current deployment is via container image. diff --git a/pkg/common/trace_types.go b/pkg/common/trace_types.go index 1faedd9b2..05190a784 100644 --- a/pkg/common/trace_types.go +++ b/pkg/common/trace_types.go @@ -24,6 +24,8 @@ package common +import "container/list" + type FunctionInvocationStats struct { HashOwner string HashApp string @@ -103,3 +105,9 @@ type Function struct { Specification *FunctionSpecification } + +type Node struct { + Function *Function + Branches []*list.List + Depth int +} diff --git a/pkg/common/utilities.go b/pkg/common/utilities.go index 94f6f3105..c836fa833 100644 --- a/pkg/common/utilities.go +++ b/pkg/common/utilities.go @@ -135,3 +135,15 @@ func SumNumberOfInvocations(withWarmup bool, totalDuration int, functions []*Fun return result } + +func GetName(function *Function) int { + parts := strings.Split(function.Name, "-") + if parts[0] == "test" { + return 0 + } + functionId, err := strconv.Atoi(parts[2]) + if err != nil { + log.Fatal(err) + } + return functionId +} \ No newline at end of file diff --git a/pkg/config/parser.go b/pkg/config/parser.go index c6015d671..26433ba68 100644 --- a/pkg/config/parser.go +++ b/pkg/config/parser.go @@ -80,6 +80,10 @@ type LoaderConfiguration struct { GRPCConnectionTimeoutSeconds int `json:"GRPCConnectionTimeoutSeconds"` GRPCFunctionTimeoutSeconds int `json:"GRPCFunctionTimeoutSeconds"` DAGMode bool `json:"DAGMode"` + EnableDAGDataset bool `json:"EnableDAGDataset"` + DAGEntryFunction int `json:"DAGEntryFunction"` + Width int `json:"Width"` + Depth int `json:"Depth"` } func ReadConfigurationFile(path string) LoaderConfiguration { diff --git a/pkg/config/parser_test.go b/pkg/config/parser_test.go index a4abb4058..b205aadcb 100644 --- a/pkg/config/parser_test.go +++ b/pkg/config/parser_test.go @@ -61,7 +61,12 @@ func TestConfigParser(t *testing.T) { config.MetricScrapingPeriodSeconds != 15 || config.AutoscalingMetric != "concurrency" || config.GRPCConnectionTimeoutSeconds != 15 || - config.GRPCFunctionTimeoutSeconds != 900 { + config.GRPCFunctionTimeoutSeconds != 900 || + config.DAGMode != false || + config.EnableDAGDataset != true || + config.DAGEntryFunction != 0 || + config.Width != 2 || + config.Depth != 2 { t.Error("Unexpected configuration read.") } diff --git a/pkg/config/test_config.json b/pkg/config/test_config.json index 9e6a65e30..e7ffcbff4 100644 --- a/pkg/config/test_config.json +++ b/pkg/config/test_config.json @@ -22,5 +22,9 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false + "DAGMode": false, + "EnableDAGDataset": true, + "DAGEntryFunction": 0, + "Width": 2, + "Depth": 2 } diff --git a/pkg/config/test_config_aws.json b/pkg/config/test_config_aws.json index d1fa17f8b..5abd1dbcd 100644 --- a/pkg/config/test_config_aws.json +++ b/pkg/config/test_config_aws.json @@ -21,5 +21,9 @@ "GRPCConnectionTimeoutSeconds": 15, "GRPCFunctionTimeoutSeconds": 900, - "DAGMode": false -} + "DAGMode": false, + "EnableDAGDataset": true, + "DAGEntryFunction": 0, + "Width": 2, + "Depth": 2 +} \ No newline at end of file diff --git a/pkg/driver/trace_driver.go b/pkg/driver/trace_driver.go index fe83fd42f..ffb6e84fd 100644 --- a/pkg/driver/trace_driver.go +++ b/pkg/driver/trace_driver.go @@ -29,10 +29,6 @@ import ( "encoding/csv" "encoding/json" "fmt" - "github.com/vhive-serverless/loader/pkg/config" - "github.com/vhive-serverless/loader/pkg/driver/clients" - "github.com/vhive-serverless/loader/pkg/driver/deployment" - "github.com/vhive-serverless/loader/pkg/driver/failure" "math" "os" "strconv" @@ -40,6 +36,11 @@ import ( "sync/atomic" "time" + "github.com/vhive-serverless/loader/pkg/config" + "github.com/vhive-serverless/loader/pkg/driver/clients" + "github.com/vhive-serverless/loader/pkg/driver/deployment" + "github.com/vhive-serverless/loader/pkg/driver/failure" + "github.com/gocarina/gocsv" log "github.com/sirupsen/logrus" "github.com/vhive-serverless/loader/pkg/common" @@ -93,15 +94,6 @@ func (d *Driver) runCSVWriter(records chan interface{}, filename string, writerD writerDone.Done() } -func DAGCreation(functions []*common.Function) *list.List { - linkedList := list.New() - // Assigning nodes one after another - for _, function := range functions { - linkedList.PushBack(function) - } - return linkedList -} - ///////////////////////////////////////// // METRICS SCRAPPERS ///////////////////////////////////////// @@ -178,6 +170,7 @@ type InvocationMetadata struct { SuccessCount *int64 FailedCount *int64 FailedCountByMinute []int64 + FunctionsInvoked *int64 RecordOutputChannel chan interface{} AnnounceDoneWG *sync.WaitGroup @@ -207,22 +200,39 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { node := metadata.RootFunction.Front() var record *mc.ExecutionRecord var runtimeSpecifications *common.RuntimeSpecification + var branches []*list.List + var invocationRetries int + var numberOfFunctionsInvoked int64 for node != nil { - function := node.Value.(*common.Function) + function := node.Value.(*common.Node).Function runtimeSpecifications = &function.Specification.RuntimeSpecification[metadata.MinuteIndex][metadata.InvocationIndex] - success, record = d.Invoker.Invoke(function, runtimeSpecifications) + if !success && (d.Configuration.LoaderConfiguration.DAGMode && invocationRetries == 0) { + log.Debugf("Invocation failed at minute: %d for %s. Retrying Invocation", metadata.MinuteIndex, function.Name) + invocationRetries += 1 + continue + } record.Phase = int(metadata.Phase) record.InvocationID = composeInvocationID(d.Configuration.TraceGranularity, metadata.MinuteIndex, metadata.InvocationIndex) metadata.RecordOutputChannel <- record - + numberOfFunctionsInvoked += 1 if !success { log.Debugf("Invocation failed at minute: %d for %s", metadata.MinuteIndex, function.Name) break } + branches = node.Value.(*common.Node).Branches + for i := 0; i < len(branches); i++ { + newMetadataValue := *metadata + newMetadata := &newMetadataValue + newMetadata.RootFunction = branches[i] + newMetadata.AnnounceDoneWG.Add(1) + atomic.AddInt64(metadata.SuccessCount, -1) + go d.invokeFunction(newMetadata) + } node = node.Next() } + atomic.AddInt64(metadata.FunctionsInvoked, numberOfFunctionsInvoked) if success { atomic.AddInt64(metadata.SuccessCount, 1) } else { @@ -231,11 +241,11 @@ func (d *Driver) invokeFunction(metadata *InvocationMetadata) { } } -func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.WaitGroup, +func (d *Driver) functionsDriver(functionLinkedList *list.List, announceFunctionDone *sync.WaitGroup, addInvocationsToGroup *sync.WaitGroup, readOpenWhiskMetadata *sync.Mutex, totalSuccessful *int64, - totalFailed *int64, totalIssued *int64, recordOutputChannel chan interface{}) { + totalFailed *int64, totalIssued *int64, entriesWritten *int64, recordOutputChannel chan interface{}) { - function := list.Front().Value.(*common.Function) + function := functionLinkedList.Front().Value.(*common.Node).Function numberOfInvocations := 0 for i := 0; i < len(function.InvocationStats.Invocations); i++ { numberOfInvocations += function.InvocationStats.Invocations[i] @@ -251,6 +261,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai var failedInvocations int64 var failedInvocationByMinute = make([]int64, totalTraceDuration) var numberOfIssuedInvocations int64 + var functionsInvoked int64 var currentPhase = common.ExecutionPhase waitForInvocations := sync.WaitGroup{} @@ -310,13 +321,14 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai waitForInvocations.Add(1) go d.invokeFunction(&InvocationMetadata{ - RootFunction: list, + RootFunction: functionLinkedList, Phase: currentPhase, MinuteIndex: minuteIndex, InvocationIndex: invocationIndex, SuccessCount: &successfulInvocations, FailedCount: &failedInvocations, FailedCountByMinute: failedInvocationByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: recordOutputChannel, AnnounceDoneWG: &waitForInvocations, AnnounceDoneExe: addInvocationsToGroup, @@ -331,7 +343,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai InvocationID: composeInvocationID(d.Configuration.TraceGranularity, minuteIndex, invocationIndex), StartTime: time.Now().UnixNano(), } - + functionsInvoked++ successfulInvocations++ } numberOfIssuedInvocations++ @@ -347,6 +359,7 @@ func (d *Driver) functionsDriver(list *list.List, announceFunctionDone *sync.Wai atomic.AddInt64(totalSuccessful, successfulInvocations) atomic.AddInt64(totalFailed, failedInvocations) atomic.AddInt64(totalIssued, numberOfIssuedInvocations) + atomic.AddInt64(entriesWritten, functionsInvoked) } func (d *Driver) proceedToNextMinute(function *common.Function, minuteIndex *int, invocationIndex *int, startOfMinute *time.Time, @@ -527,7 +540,7 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { var successfulInvocations int64 var failedInvocations int64 var invocationsIssued int64 - var functionsPerDAG int64 + var entriesWritten int64 readOpenWhiskMetadata := sync.Mutex{} allFunctionsInvoked := sync.WaitGroup{} allIndividualDriversCompleted := sync.WaitGroup{} @@ -538,10 +551,11 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { if !iatOnly { log.Info("Generating IAT and runtime specifications for all the functions") + DAGEntryFunctionIndex := d.Configuration.LoaderConfiguration.DAGEntryFunction + DAGInvocation := d.Configuration.Functions[DAGEntryFunctionIndex].InvocationStats.Invocations for i, function := range d.Configuration.Functions { - // Equalising all the InvocationStats to the first function if d.Configuration.LoaderConfiguration.DAGMode { - function.InvocationStats.Invocations = d.Configuration.Functions[0].InvocationStats.Invocations + function.InvocationStats.Invocations = DAGInvocation } spec := d.SpecificationGenerator.GenerateInvocationData( function, @@ -549,7 +563,6 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { d.Configuration.ShiftIAT, d.Configuration.TraceGranularity, ) - d.Configuration.Functions[i].Specification = spec } } @@ -571,35 +584,36 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { } if d.Configuration.LoaderConfiguration.DAGMode { + functions := d.Configuration.Functions + dagLinkedList := generator.GenerateDAG(d.Configuration.LoaderConfiguration, functions) log.Infof("Starting DAG invocation driver\n") - functionLinkedList := DAGCreation(d.Configuration.Functions) - functionsPerDAG = int64(len(d.Configuration.Functions)) allIndividualDriversCompleted.Add(1) go d.functionsDriver( - functionLinkedList, + dagLinkedList, &allIndividualDriversCompleted, &allFunctionsInvoked, &readOpenWhiskMetadata, &successfulInvocations, &failedInvocations, &invocationsIssued, + &entriesWritten, globalMetricsCollector, ) } else { log.Infof("Starting function invocation driver\n") - functionsPerDAG = 1 for _, function := range d.Configuration.Functions { allIndividualDriversCompleted.Add(1) - linkedList := list.New() - linkedList.PushBack(function) + functionLinkedList := list.New() + functionLinkedList.PushBack(&common.Node{Function: function, Depth: 0}) go d.functionsDriver( - linkedList, + functionLinkedList, &allIndividualDriversCompleted, &allFunctionsInvoked, &readOpenWhiskMetadata, &successfulInvocations, &failedInvocations, &invocationsIssued, + &entriesWritten, globalMetricsCollector, ) } @@ -608,7 +622,7 @@ func (d *Driver) internalRun(iatOnly bool, generated bool) { if atomic.LoadInt64(&successfulInvocations)+atomic.LoadInt64(&failedInvocations) != 0 { log.Debugf("Waiting for all the invocations record to be written.\n") - totalIssuedChannel <- atomic.LoadInt64(&invocationsIssued) * functionsPerDAG + totalIssuedChannel <- atomic.LoadInt64(&entriesWritten) scraperFinishCh <- 0 // Ask the scraper to finish metrics collection allRecordsWritten.Wait() diff --git a/pkg/driver/trace_driver_test.go b/pkg/driver/trace_driver_test.go index 61d498080..4b11befae 100644 --- a/pkg/driver/trace_driver_test.go +++ b/pkg/driver/trace_driver_test.go @@ -135,7 +135,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { testDriver := createTestDriver() var failureCountByMinute = make([]int64, testDriver.Configuration.TraceDuration) - + var functionsInvoked int64 if !test.forceFail { address, port := "localhost", test.port testDriver.Configuration.Functions[0].Endpoint = fmt.Sprintf("%s:%d", address, port) @@ -145,10 +145,10 @@ func TestInvokeFunctionFromDriver(t *testing.T) { // make sure that the gRPC server is running time.Sleep(2 * time.Second) } - + function := testDriver.Configuration.Functions[0] + node := &common.Node{Function: testDriver.Configuration.Functions[0]} list := list.New() - list.PushBack(testDriver.Configuration.Functions[0]) - function := list.Front().Value.(*common.Function) + list.PushBack(node) for i := 0; i < len(function.Specification.RuntimeSpecification); i++ { function.Specification.RuntimeSpecification[i] = make([]common.RuntimeSpecification, 3) } @@ -164,6 +164,7 @@ func TestInvokeFunctionFromDriver(t *testing.T) { SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } @@ -196,13 +197,13 @@ func TestInvokeFunctionFromDriver(t *testing.T) { func TestDAGInvocation(t *testing.T) { var successCount int64 = 0 var failureCount int64 = 0 - var functionsToInvoke int = 4 + var functionsToInvoke int = 3 + var functionsInvoked int64 invocationRecordOutputChannel := make(chan interface{}, functionsToInvoke) announceDone := &sync.WaitGroup{} testDriver := createTestDriver() var failureCountByMinute = make([]int64, testDriver.Configuration.TraceDuration) - list := list.New() address, port := "localhost", 8085 function := testDriver.Configuration.Functions[0] function.Endpoint = fmt.Sprintf("%s:%d", address, port) @@ -215,28 +216,48 @@ func TestDAGInvocation(t *testing.T) { Runtime: 1000, Memory: 128, } - for i := 0; i < functionsToInvoke; i++ { - function = testDriver.Configuration.Functions[0] - list.PushBack(function) + functionList := make([]*common.Function, 3) + for i := 0; i < len(functionList); i++ { + functionList[i] = function + } + originalBranch := []*list.List{ + func() *list.List { + l := list.New() + l.PushBack(&common.Node{Function: functionList[0], Depth: 0}) + l.PushBack(&common.Node{Function: functionList[1], Depth: 1}) + return l + }(), + } + + newBranch := []*list.List{ + func() *list.List { + l := list.New() + l.PushBack(&common.Node{Function: functionList[2], Depth: 1}) + return l + }(), } + rootFunction := originalBranch[0] + rootFunction.Front().Value.(*common.Node).Branches = newBranch time.Sleep(2 * time.Second) metadata := &InvocationMetadata{ - RootFunction: list, + RootFunction: rootFunction, Phase: common.ExecutionPhase, MinuteIndex: 0, InvocationIndex: 2, SuccessCount: &successCount, FailedCount: &failureCount, FailedCountByMinute: failureCountByMinute, + FunctionsInvoked: &functionsInvoked, RecordOutputChannel: invocationRecordOutputChannel, AnnounceDoneWG: announceDone, } announceDone.Add(1) testDriver.invokeFunction(metadata) - if !(successCount == 1 && failureCount == 0) { + announceDone.Wait() + if !(functionsInvoked == 3 && failureCount == 0) { t.Error("The DAG invocation has failed.") } for i := 0; i < functionsToInvoke; i++ { diff --git a/pkg/generator/dag_generation.go b/pkg/generator/dag_generation.go new file mode 100644 index 000000000..3fa1f3fbc --- /dev/null +++ b/pkg/generator/dag_generation.go @@ -0,0 +1,223 @@ +package generator + +import ( + "container/list" + "encoding/csv" + "fmt" + "math/rand" + "os" + "strconv" + "strings" + + log "github.com/sirupsen/logrus" + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" +) + +// Visual Representation for the DAG +func printDAG(DAGWorkflow *list.List) { + DAGNode := DAGWorkflow.Front() + nodeQueue := make([]*list.Element, 0) + nodeQueue = append(nodeQueue, DAGNode) + var printMessage string + var buffer string = "" + var dummyNode *list.Element + var startingNode bool = true + for len(nodeQueue) > 0 { + DAGNode = nodeQueue[0] + nodeQueue = nodeQueue[1:] + functionId := common.GetName(DAGNode.Value.(*common.Node).Function) + if startingNode { + printMessage = "|" + strconv.Itoa(functionId) + for i := 0; i < DAGNode.Value.(*common.Node).Depth; i++ { + buffer += " " + } + printMessage = buffer + printMessage + startingNode = false + } else { + printMessage = printMessage + " -> " + strconv.Itoa(functionId) + } + for i := 0; i < len(DAGNode.Value.(*common.Node).Branches); i++ { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Value.(*common.Node).Branches[i].Front() + } + if DAGNode.Next() == nil { + println(printMessage) + buffer = "" + if len(nodeQueue) > 0 { + startingNode = true + } else { + break + } + } else { + nodeQueue = append(nodeQueue, dummyNode) + copy(nodeQueue[1:], nodeQueue) + nodeQueue[0] = DAGNode.Next() + } + } +} + +// Read the Cumulative Distribution Frequency (CDF) of the widths and depths of a DAG +func generateCDF(file string) [][]float64 { + f, err := os.Open(file) + if err != nil { + log.Fatal(err) + } + defer f.Close() + csvReader := csv.NewReader(f) + records, err := csvReader.ReadAll() + if err != nil { + log.Fatal(err) + } + records = records[1:] + cdf := make([][]float64, len(records[0])) + for i := 0; i < len(records[0]); i++ { + cdf[i] = make([]float64, len(records)) + } + for i := 0; i < len(records[0]); i += 2 { + for j := 0; j < len(records); j++ { + cdfProb, _ := strconv.ParseFloat(strings.TrimSuffix(records[j][i+1], "%"), 64) + cdfValue, _ := strconv.ParseFloat(records[j][i], 64) + cdf[i+1][j] = cdfProb + cdf[i][j] = cdfValue + if cdfProb == 100.00 { + cdf[i] = cdf[i][:j+1] + cdf[i+1] = cdf[i+1][:j+1] + break + } + } + } + return cdf +} + +// Generate pseudo-random probabilities and compare it with the given CDF to obtain the depth and width of the DAG +func getDAGStats(cdf [][]float64, maxSize int, numberOfTries int) (int, int) { + var width, depth int + depthProb := rand.Float64() * 100 + widthProb := rand.Float64() * 100 + for i, value := range cdf[1] { + if value >= widthProb { + width = int(cdf[0][i]) + break + } + } + for i, value := range cdf[3] { + if value >= depthProb { + depth = int(cdf[2][i]) + break + } + } + // Re-run DAG Generation if size exceeds number of functions + if maxSize < width*(depth-1)+1 { + if numberOfTries == 10 { + return 1, maxSize + } + width, depth = getDAGStats(cdf, maxSize, numberOfTries+1) + } + return width, depth +} + +func GenerateDAG(config *config.LoaderConfiguration, functions []*common.Function) *list.List { + var width, depth int + dagFunctionIndex := config.DAGEntryFunction + entryFunction := functions[dagFunctionIndex] + if config.EnableDAGDataset { + DAGDistribution := generateCDF(fmt.Sprintf("%s/dag_structure.csv", config.TracePath)) + width, depth = getDAGStats(DAGDistribution, len(functions), 0) + } else { + // Sanity checking if max size of DAG exceeds number of functions available + width = config.Width + depth = config.Depth + if len(functions) < (depth-1)*width+1 { + log.Fatalf("DAG size exceeded: Functions required: %d, Available Functions: %d", (depth-1)*width+1, len(functions)) + } + } + functionLinkedList := createDAGWorkflow(functions, entryFunction, width, depth) + printDAG(functionLinkedList) + return functionLinkedList +} + +func createDAGWorkflow(functionList []*common.Function, function *common.Function, maxWidth int, maxDepth int) *list.List { + DAGList := list.New() + if maxDepth == 1 { + DAGList.PushBack(&common.Node{Function: function, Depth: 0}) + return DAGList + } + widthList := generateNodeDistribution(maxWidth, maxDepth) + // Implement a FIFO queue for nodes to assign functions and branches to each node. + nodeQueue := []*list.Element{} + for i := 0; i < len(widthList); i++ { + widthList[i] -= 1 + DAGList.PushBack(&common.Node{Depth: -1}) + } + var functionID int = common.GetName(function) + DAGList.Front().Value = &common.Node{Function: function, Depth: 0} + functionID = (functionID + 1) % len(functionList) + nodeQueue = append(nodeQueue, DAGList.Front()) + for len(nodeQueue) > 0 { + listElement := nodeQueue[0] + nodeQueue = nodeQueue[1:] + node := listElement.Value.(*common.Node) + // Checks if the node has reached the maximum depth of the DAG (maxDepth -1) + if node.Depth == maxDepth-1 { + continue + } + child := &common.Node{Function: functionList[functionID], Depth: node.Depth + 1} + functionID = (functionID + 1) % len(functionList) + listElement.Next().Value = child + nodeQueue = append(nodeQueue, listElement.Next()) + // Creating parallel branches from the node, if width of next stage > width of current stage + var nodeList []*list.List + if widthList[node.Depth+1] > 0 { + nodeList, nodeQueue = addBranches(nodeQueue, widthList, node, functionList, functionID) + functionID = (functionID + len(nodeList)) % len(functionList) + } else { + nodeList = []*list.List{} + } + node.Branches = nodeList + } + return DAGList +} + +func addBranches(nodeQueue []*list.Element, widthList []int, node *common.Node, functionList []*common.Function, functionID int) ([]*list.List, []*list.Element) { + var additionalBranches int + if len(nodeQueue) < 1 || (nodeQueue[0].Value.(*common.Node).Depth > node.Depth) { + additionalBranches = widthList[node.Depth+1] + } else { + additionalBranches = rand.Intn(widthList[node.Depth+1] + 1) + } + for i := node.Depth + 1; i < len(widthList); i++ { + widthList[i] -= additionalBranches + } + nodeList := make([]*list.List, additionalBranches) + for i := 0; i < additionalBranches; i++ { + newBranch := createNewBranch(functionList, node, len(widthList), functionID) + functionID = (functionID + 1) % len(functionList) + nodeList[i] = newBranch + nodeQueue = append(nodeQueue, newBranch.Front()) + } + return nodeList, nodeQueue +} + +func createNewBranch(functionList []*common.Function, node *common.Node, maxDepth int, functionID int) *list.List { + DAGBranch := list.New() + // Ensuring that each node is connected to a child until the maximum depth + for i := node.Depth + 1; i < maxDepth; i++ { + DAGBranch.PushBack(&common.Node{Depth: -1}) + } + child := &common.Node{Function: functionList[functionID], Depth: node.Depth + 1} + DAGBranch.Front().Value = child + return DAGBranch +} + +func generateNodeDistribution(maxWidth int, maxDepth int) []int { + // Generating the number of nodes per depth (stage). + widthList := []int{} + widthList = append(widthList, 1) + for i := 1; i < maxDepth-1; i++ { + widthList = append(widthList, (rand.Intn(maxWidth-widthList[i-1]+1) + widthList[i-1])) + } + widthList = append(widthList, maxWidth) + return widthList +} diff --git a/pkg/generator/dag_generation_test.go b/pkg/generator/dag_generation_test.go new file mode 100644 index 000000000..01af6145e --- /dev/null +++ b/pkg/generator/dag_generation_test.go @@ -0,0 +1,101 @@ +/* + * MIT License + * + * Copyright (c) 2023 EASL and the vHive community + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package generator + +import ( + "testing" + + "github.com/vhive-serverless/loader/pkg/common" + "github.com/vhive-serverless/loader/pkg/config" +) + +var fakeConfig *config.LoaderConfiguration = &config.LoaderConfiguration{ + Platform: "Knative", + InvokeProtocol: "grpc", + TracePath: "data/traces/example", + OutputPathPrefix: "test", + EnableZipkinTracing: true, + GRPCConnectionTimeoutSeconds: 5, + GRPCFunctionTimeoutSeconds: 15, + DAGMode: true, + DAGEntryFunction: 0, + EnableDAGDataset: false, + Width: 2, + Depth: 2, +} + +var functions []*common.Function = []*common.Function{ + { + Name: "test-function", + InvocationStats: &common.FunctionInvocationStats{ + Invocations: []int{ + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + 5, 5, 5, 5, 5, + }, + }, + RuntimeStats: &common.FunctionRuntimeStats{ + Average: 50, + Count: 100, + Minimum: 0, + Maximum: 100, + Percentile0: 0, + Percentile1: 1, + Percentile25: 25, + Percentile50: 50, + Percentile75: 75, + Percentile99: 99, + Percentile100: 100, + }, + MemoryStats: &common.FunctionMemoryStats{ + Average: 5000, + Count: 100, + Percentile1: 100, + Percentile5: 500, + Percentile25: 2500, + Percentile50: 5000, + Percentile75: 7500, + Percentile95: 9500, + Percentile99: 9900, + Percentile100: 10000, + }, + Specification: &common.FunctionSpecification{ + RuntimeSpecification: make([][]common.RuntimeSpecification, 1), + }, + }, +} +var functionList []*common.Function = make([]*common.Function, 3) + +func TestGenerateDAG(t *testing.T) { + for i := 0; i < len(functionList); i++ { + functionList[i] = functions[0] + } + dagList := GenerateDAG(fakeConfig, functionList) + branch := dagList.Front().Value.(*common.Node).Branches + if dagList.Len() != 2 && len(branch) != 1 { + t.Error("Invalid DAG Generated") + } +}