Skip to content

Commit

Permalink
improve the performance of GMC router. (#356)
Browse files Browse the repository at this point in the history
Signed-off-by: zhlsunshine <[email protected]>
  • Loading branch information
zhlsunshine authored Aug 28, 2024
1 parent b9bc034 commit 68a2011
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions microservices-connector/cmd/router/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,32 @@ import (
flag "github.com/spf13/pflag"
)

const (
BufferSize = 1024
MaxGoroutines = 1024
ServiceURL = "serviceUrl"
ServiceNode = "node"
DataPrep = "DataPrep"
Parameters = "parameters"
)

var (
jsonGraph = flag.String("graph-json", "", "serialized json graph def")
log = logf.Log.WithName("GMCGraphRouter")
mcGraph *mcv1alpha3.GMConnector
defaultNodeName = "root"
Prefix = []byte("data: b'")
Suffix = []byte("'\n\n")
DONE = []byte("[DONE]")
Newline = []byte("\n")
)

const (
BufferSize = 1024
ServiceURL = "serviceUrl"
ServiceNode = "node"
DataPrep = "DataPrep"
Parameters = "parameters"
Llm = "Llm"
semaphore = make(chan struct{}, MaxGoroutines)
transport = &http.Transport{
MaxIdleConns: 1000,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 2 * time.Minute,
TLSHandshakeTimeout: time.Minute,
ExpectContinueTimeout: 30 * time.Second,
}
callClient = &http.Client{
Transport: transport,
Timeout: 30 * time.Second,
}
)

type EnsembleStepOutput struct {
Expand Down Expand Up @@ -151,6 +159,9 @@ func callService(
input []byte,
headers http.Header,
) (io.ReadCloser, int, error) {
semaphore <- struct{}{}
defer func() { <-semaphore }()

defer timeTrack(time.Now(), "step", serviceUrl)
log.Info("Entering callService", "url", serviceUrl)

Expand All @@ -164,6 +175,7 @@ func callService(
return nil, 400, err
}
}

req, err := http.NewRequest("POST", serviceUrl, bytes.NewBuffer(input))
if err != nil {
log.Error(err, "An error occurred while preparing request object with serviceUrl.", "serviceUrl", serviceUrl)
Expand All @@ -173,8 +185,8 @@ func callService(
if val := req.Header.Get("Content-Type"); val == "" {
req.Header.Add("Content-Type", "application/json")
}
resp, err := http.DefaultClient.Do(req)

resp, err := callClient.Do(req)
if err != nil {
log.Error(err, "An error has occurred while calling service", "service", serviceUrl)
return nil, 500, err
Expand Down Expand Up @@ -561,15 +573,6 @@ func mcGraphHandler(w http.ResponseWriter, req *http.Request) {
break
}

/*sliceBF := buffer[:n]
if !bytes.HasPrefix(sliceBF, DONE) {
sliceBF = bytes.TrimPrefix(sliceBF, Prefix)
sliceBF = bytes.TrimSuffix(sliceBF, Suffix)
} else {
sliceBF = bytes.Join([][]byte{Newline, sliceBF}, nil)
}
log.Info("[llm - chat_stream] chunk:", "Buffer", string(sliceBF))*/

// Write the chunk to the ResponseWriter
if _, err := w.Write(buffer[:n]); err != nil {
log.Error(err, "failed to write to ResponseWriter")
Expand Down

0 comments on commit 68a2011

Please sign in to comment.