From 68a2011bda1fb328e2b35cc6de3179f47d7fb202 Mon Sep 17 00:00:00 2001 From: Steve Zhang Date: Wed, 28 Aug 2024 13:49:31 +0800 Subject: [PATCH] improve the performance of GMC router. (#356) Signed-off-by: zhlsunshine --- microservices-connector/cmd/router/main.go | 49 ++++++++++++---------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/microservices-connector/cmd/router/main.go b/microservices-connector/cmd/router/main.go index 73e8aecd..eacf84db 100644 --- a/microservices-connector/cmd/router/main.go +++ b/microservices-connector/cmd/router/main.go @@ -38,24 +38,32 @@ import ( flag "github.com/spf13/pflag" ) +const ( + BufferSize = 1024 + MaxGoroutines = 1024 + ServiceURL = "serviceUrl" + ServiceNode = "node" + DataPrep = "DataPrep" + Parameters = "parameters" +) + var ( jsonGraph = flag.String("graph-json", "", "serialized json graph def") log = logf.Log.WithName("GMCGraphRouter") mcGraph *mcv1alpha3.GMConnector defaultNodeName = "root" - Prefix = []byte("data: b'") - Suffix = []byte("'\n\n") - DONE = []byte("[DONE]") - Newline = []byte("\n") -) - -const ( - BufferSize = 1024 - ServiceURL = "serviceUrl" - ServiceNode = "node" - DataPrep = "DataPrep" - Parameters = "parameters" - Llm = "Llm" + semaphore = make(chan struct{}, MaxGoroutines) + transport = &http.Transport{ + MaxIdleConns: 1000, + MaxIdleConnsPerHost: 100, + IdleConnTimeout: 2 * time.Minute, + TLSHandshakeTimeout: time.Minute, + ExpectContinueTimeout: 30 * time.Second, + } + callClient = &http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + } ) type EnsembleStepOutput struct { @@ -151,6 +159,9 @@ func callService( input []byte, headers http.Header, ) (io.ReadCloser, int, error) { + semaphore <- struct{}{} + defer func() { <-semaphore }() + defer timeTrack(time.Now(), "step", serviceUrl) log.Info("Entering callService", "url", serviceUrl) @@ -164,6 +175,7 @@ func callService( return nil, 400, err } } + req, err := http.NewRequest("POST", serviceUrl, bytes.NewBuffer(input)) if err != nil { log.Error(err, "An error occurred while preparing request object with serviceUrl.", "serviceUrl", serviceUrl) @@ -173,8 +185,8 @@ func callService( if val := req.Header.Get("Content-Type"); val == "" { req.Header.Add("Content-Type", "application/json") } - resp, err := http.DefaultClient.Do(req) + resp, err := callClient.Do(req) if err != nil { log.Error(err, "An error has occurred while calling service", "service", serviceUrl) return nil, 500, err @@ -561,15 +573,6 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) { break } - /*sliceBF := buffer[:n] - if !bytes.HasPrefix(sliceBF, DONE) { - sliceBF = bytes.TrimPrefix(sliceBF, Prefix) - sliceBF = bytes.TrimSuffix(sliceBF, Suffix) - } else { - sliceBF = bytes.Join([][]byte{Newline, sliceBF}, nil) - } - log.Info("[llm - chat_stream] chunk:", "Buffer", string(sliceBF))*/ - // Write the chunk to the ResponseWriter if _, err := w.Write(buffer[:n]); err != nil { log.Error(err, "failed to write to ResponseWriter")