Skip to content

Commit

Permalink
Add the init input for pipeline to keep the parameter information. (#178
Browse files Browse the repository at this point in the history
)

Signed-off-by: zhlsunshine <[email protected]>
  • Loading branch information
zhlsunshine authored Jul 19, 2024
1 parent b25c0bb commit e25a1f8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
43 changes: 26 additions & 17 deletions microservices-connector/cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ func isSuccessFul(statusCode int) bool {
return false
}

func pickupRouteByCondition(rawInput []byte, condition string) bool {
func pickupRouteByCondition(initInput []byte, condition string) bool {
//sample config supported by gjson
//"instances" : [
// {"model_id", "1"},
// ]
// sample condition support by gjson query: "instances.#(modelId==\"1\")""
if !gjson.ValidBytes(rawInput) {
fmt.Println("the rawInput json format is invalid")
if !gjson.ValidBytes(initInput) {
fmt.Println("the initInput json format is invalid")
return false
}

if gjson.GetBytes(rawInput, condition).Exists() {
if gjson.GetBytes(initInput, condition).Exists() {
return true
}
// ' and # will define a gjson query
Expand All @@ -104,7 +104,7 @@ func pickupRouteByCondition(rawInput []byte, condition string) bool {
} else {
key := strings.TrimSpace(condition[:index])
value := strings.TrimSpace(condition[index+2:])
v := gjson.GetBytes(rawInput, key).String()
v := gjson.GetBytes(initInput, key).String()
if v == value {
return true
}
Expand Down Expand Up @@ -184,12 +184,13 @@ func getServiceURLByStepTarget(step *mcv1alpha3.Step, svcNameSpace string) strin
func executeStep(
step *mcv1alpha3.Step,
graph mcv1alpha3.GMConnector,
initInput []byte,
input []byte,
headers http.Header,
) ([]byte, int, error) {
if step.NodeName != "" {
// when nodeName is specified make a recursive call for routing to next step
return routeStep(step.NodeName, graph, input, headers)
return routeStep(step.NodeName, graph, initInput, input, headers)
}
serviceURL := getServiceURLByStepTarget(step, graph.Namespace)
return callService(step, serviceURL, input, headers)
Expand All @@ -198,6 +199,7 @@ func executeStep(
func handleSwitchNode(
route *mcv1alpha3.Step,
graph mcv1alpha3.GMConnector,
initInput []byte,
request []byte,
headers http.Header,
) ([]byte, int, error) {
Expand All @@ -209,7 +211,7 @@ func handleSwitchNode(
stepType = ServiceNode
}
log.Info("Starting execution of step", "type", stepType, "stepName", route.StepName)
if responseBytes, statusCode, err = executeStep(route, graph, request, headers); err != nil {
if responseBytes, statusCode, err = executeStep(route, graph, initInput, request, headers); err != nil {
return nil, 500, err
}

Expand All @@ -227,6 +229,7 @@ func handleSwitchNode(

func handleSwitchPipeline(nodeName string,
graph mcv1alpha3.GMConnector,
initInput []byte,
input []byte,
headers http.Header,
) ([]byte, int, error) {
Expand All @@ -250,13 +253,13 @@ func handleSwitchPipeline(nodeName string,
request = responseBytes
}
if route.Condition == "" {
responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers)
responseBytes, statusCode, err = handleSwitchNode(&route, graph, initInput, request, headers)
if err != nil {
return responseBytes, statusCode, err
}
} else {
if pickupRouteByCondition(input, route.Condition) {
responseBytes, statusCode, err = handleSwitchNode(&route, graph, request, headers)
if pickupRouteByCondition(initInput, route.Condition) {
responseBytes, statusCode, err = handleSwitchNode(&route, graph, initInput, request, headers)
if err != nil {
return responseBytes, statusCode, err
}
Expand All @@ -269,6 +272,7 @@ func handleSwitchPipeline(nodeName string,

func handleEnsemblePipeline(nodeName string,
graph mcv1alpha3.GMConnector,
initInput []byte,
input []byte,
headers http.Header,
) ([]byte, int, error) {
Expand All @@ -285,7 +289,7 @@ func handleEnsemblePipeline(nodeName string,
resultChan := make(chan EnsembleStepOutput)
ensembleRes[i] = resultChan
go func() {
output, statusCode, err := executeStep(step, graph, input, headers)
output, statusCode, err := executeStep(step, graph, initInput, input, headers)
if err == nil {
var res map[string]interface{}
if err = json.Unmarshal(output, &res); err == nil {
Expand Down Expand Up @@ -333,6 +337,7 @@ func handleEnsemblePipeline(nodeName string,

func handleSequencePipeline(nodeName string,
graph mcv1alpha3.GMConnector,
initInput []byte,
input []byte,
headers http.Header,
) ([]byte, int, error) {
Expand Down Expand Up @@ -370,7 +375,7 @@ func handleSequencePipeline(nodeName string,
return responseBytes, 500, nil
}
}
if responseBytes, statusCode, err = executeStep(step, graph, request, headers); err != nil {
if responseBytes, statusCode, err = executeStep(step, graph, initInput, request, headers); err != nil {
return nil, 500, err
}
log.Info("Print Response Bytes", "Response Bytes", responseBytes, "Status Code", statusCode)
Expand All @@ -394,20 +399,24 @@ func handleSequencePipeline(nodeName string,
return responseBytes, statusCode, nil
}

func routeStep(nodeName string, graph mcv1alpha3.GMConnector, input []byte, headers http.Header) ([]byte, int, error) {
func routeStep(nodeName string,
graph mcv1alpha3.GMConnector,
initInput, input []byte,
headers http.Header,
) ([]byte, int, error) {
defer timeTrack(time.Now(), "node", nodeName)
currentNode := graph.Spec.Nodes[nodeName]

if currentNode.RouterType == mcv1alpha3.Switch {
return handleSwitchPipeline(nodeName, graph, input, headers)
return handleSwitchPipeline(nodeName, graph, initInput, input, headers)
}

if currentNode.RouterType == mcv1alpha3.Ensemble {
return handleEnsemblePipeline(nodeName, graph, input, headers)
return handleEnsemblePipeline(nodeName, graph, initInput, input, headers)
}

if currentNode.RouterType == mcv1alpha3.Sequence {
return handleSequencePipeline(nodeName, graph, input, headers)
return handleSequencePipeline(nodeName, graph, initInput, input, headers)
}
log.Error(nil, "invalid route type", "type", currentNode.RouterType)
return nil, 500, fmt.Errorf("invalid route type: %v", currentNode.RouterType)
Expand All @@ -422,7 +431,7 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) {
defer close(done)

inputBytes, _ := io.ReadAll(req.Body)
response, statusCode, err := routeStep(defaultNodeName, *mcGraph, inputBytes, req.Header)
response, statusCode, err := routeStep(defaultNodeName, *mcGraph, inputBytes, inputBytes, req.Header)

if err != nil {
log.Error(err, "failed to process request")
Expand Down
6 changes: 3 additions & 3 deletions microservices-connector/cmd/router/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestSimpleModelChainer(t *testing.T) {
"Authorization": {"Bearer Token"},
}

res, _, err := routeStep("root", gmcGraph, jsonBytes, headers)
res, _, err := routeStep("root", gmcGraph, jsonBytes, jsonBytes, headers)
if err != nil {
return
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestSimpleServiceEnsemble(t *testing.T) {
headers := http.Header{
"Authorization": {"Bearer Token"},
}
res, _, err := routeStep("root", gmcGraph, jsonBytes, headers)
res, _, err := routeStep("root", gmcGraph, jsonBytes, jsonBytes, headers)
if err != nil {
return
}
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestMCWithCondition(t *testing.T) {
headers := http.Header{
"Authorization": {"Bearer Token"},
}
res, _, err := routeStep("root", gmcGraph, jsonBytes, headers)
res, _, err := routeStep("root", gmcGraph, jsonBytes, jsonBytes, headers)
if err != nil {
return
}
Expand Down

0 comments on commit e25a1f8

Please sign in to comment.