Skip to content

Commit

Permalink
chore: close http responses in a way to allow the Transport to re-use…
Browse files Browse the repository at this point in the history
… the TCP connection (#2718)
  • Loading branch information
atzoum authored and achettyiitr committed Dec 2, 2022
1 parent 463533e commit 11d6123
Show file tree
Hide file tree
Showing 37 changed files with 175 additions and 98 deletions.
69 changes: 36 additions & 33 deletions cmd/devtool/commands/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -64,41 +65,43 @@ func EventSend(c *cli.Context) error {
}

for i := 0; i < c.Int("count"); i++ {
anonymousId := uuid.New().String()

buf := bytes.NewBuffer(nil)
err = t.Execute(buf, map[string]string{
"AnonymousId": anonymousId,
"Timestamp": time.Now().Format(time.RFC3339),
})
if err != nil {
return err
}

req, err := http.NewRequestWithContext(c.Context, "POST", url, buf)
if err != nil {
if err := func() error {
anonymousId := uuid.New().String()
buf := bytes.NewBuffer(nil)
err = t.Execute(buf, map[string]string{
"AnonymousId": anonymousId,
"Timestamp": time.Now().Format(time.RFC3339),
})
if err != nil {
return err
}

req, err := http.NewRequestWithContext(c.Context, "POST", url, buf)
if err != nil {
return err
}

req.SetBasicAuth(c.String("write-key"), "")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("AnonymousId", anonymousId)

resp, err := client.Do(req)
if err != nil {
return err
}
defer func() { httputil.CloseResponse(resp) }()
b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
fmt.Printf("%s\n%s\n", resp.Status, b)
return fmt.Errorf("status code: %d", resp.StatusCode)
}
return nil
}(); err != nil {
return err
}

req.SetBasicAuth(c.String("write-key"), "")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("AnonymousId", anonymousId)

resp, err := client.Do(req)
if err != nil {
return err
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
fmt.Printf("%s\n%s\n", resp.Status, b)

return fmt.Errorf("status code: %d", resp.StatusCode)
}
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions config/backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -138,13 +139,12 @@ func (nc *namespaceConfig) makeHTTPRequest(ctx context.Context, url string) ([]b
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

defer func() { _ = resp.Body.Close() }()

if resp.StatusCode >= 300 {
return nil, getNotOKError(respBody, resp.StatusCode)
}
Expand Down
4 changes: 2 additions & 2 deletions config/backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types"
)

Expand Down Expand Up @@ -150,13 +151,12 @@ func (wc *singleWorkspaceConfig) makeHTTPRequest(ctx context.Context, url string
return nil, err
}

defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

defer func() { _ = resp.Body.Close() }()

if resp.StatusCode >= 300 {
return nil, getNotOKError(respBody, resp.StatusCode)
}
Expand Down
3 changes: 2 additions & 1 deletion enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -413,7 +414,7 @@ func (handle *HandleT) sendMetric(ctx context.Context, netClient *http.Client, c
httpStatTags["status"] = strconv.Itoa(resp.StatusCode)
stats.Default.NewTaggedStat(STAT_REPORTING_HTTP_REQ, stats.CountType, httpStatTags).Count(1)

defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
handle.log.Error(err.Error())
Expand Down
8 changes: 2 additions & 6 deletions enterprise/suppress-user/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/enterprise/suppress-user/model"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
Expand Down Expand Up @@ -175,12 +176,7 @@ func (s *Syncer) sync(token []byte) ([]model.Suppression, []byte, error) {
if err != nil {
return err
}
defer func() {
err := resp.Body.Close()
if err != nil {
s.log.Error(err)
}
}()
defer func() { httputil.CloseResponse(resp) }()

// If statusCode is not 2xx, then returning empty regulations
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ func (gateway *HandleT) getWarehousePending(payload []byte) bool {
return false
}

defer resp.Body.Close()
defer func() { rs_httputil.CloseResponse(resp) }()

var whPendingResponse warehouseutils.PendingEventsResponseT
respData, err := io.ReadAll(resp.Body)
Expand Down
5 changes: 3 additions & 2 deletions gateway/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
)

func TestGatewayIntegration(t *testing.T) {
Expand Down Expand Up @@ -183,7 +184,7 @@ func testGatewayByAppType(t *testing.T, appType string) {
resp, err := http.Get(healthEndpoint)
require.ErrorContains(t, err, "connection refused")
require.Nil(t, resp)
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()

// Checking now that the configuration has been processed and the server can start
t.Log("Checking health endpoint at", healthEndpoint)
Expand Down Expand Up @@ -316,7 +317,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr

res, err := httpClient.Do(req)
require.NoError(t, err)
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion gateway/webhook/webhookTransformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/rudderlabs/rudder-server/gateway/response"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func (bt *batchWebhookTransformerT) transform(events [][]byte, sourceType string
}

respBody, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
func() { httputil.CloseResponse(resp) }()

if err != nil {
bt.stats.failedStat.Count(len(events))
Expand Down
5 changes: 3 additions & 2 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
Expand Down Expand Up @@ -655,7 +656,7 @@ func getEvent(url, method string) (string, error) {
if err != nil {
return "", err
}
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
if err != nil {
Expand Down Expand Up @@ -719,7 +720,7 @@ func sendEvent(t *testing.T, payload *strings.Reader, callType, writeKey string)
t.Logf("sendEvent error: %v", err)
return
}
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions integration_test/multi_tentant_test/multi_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/rudderlabs/rudder-server/testhelper/health"
"github.com/rudderlabs/rudder-server/testhelper/rand"
whUtil "github.com/rudderlabs/rudder-server/testhelper/webhook"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)

Expand Down Expand Up @@ -215,7 +216,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
require.ErrorContains(t, err, "connection refused")
require.Nil(t, resp)
if err == nil {
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()
}

// Pushing valid configuration via ETCD
Expand Down Expand Up @@ -415,7 +416,7 @@ func sendEvent(t *testing.T, httpPort int, payload *strings.Reader, callType, wr

res, err := httpClient.Do(req)
require.NoError(t, err)
defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()

body, err := io.ReadAll(res.Body)
require.NoError(t, err)
Expand Down
3 changes: 2 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/utils/bytesize"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
Expand Down Expand Up @@ -562,7 +563,7 @@ func (proc *HandleT) makeFeaturesFetchCall() bool {
return true
}

defer func() { _ = res.Body.Close() }()
defer func() { httputil.CloseResponse(res) }()
body, err := io.ReadAll(res.Body)
if err != nil {
return true
Expand Down
5 changes: 3 additions & 2 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -178,7 +179,7 @@ func GetVersion() (transformerBuildVersion string) {
transformerBuildVersion = fmt.Sprintf("No response from transformer. %s", transformerBuildVersion)
return
}
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()
if resp.StatusCode == http.StatusOK {
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -387,8 +388,8 @@ func (trans *HandleT) doPost(ctx context.Context, rawJSON []byte, url string, ta
if reqErr != nil {
return reqErr
}
defer func() { httputil.CloseResponse(resp) }()
respData, reqErr = io.ReadAll(resp.Body)
_ = resp.Body.Close()
return reqErr
},
backoff.WithMaxRetries(backoff.NewExponentialBackOff(), uint64(maxRetry)),
Expand Down
3 changes: 2 additions & 1 deletion regulation-worker/cmd/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/services/kvstoremanager"
"github.com/rudderlabs/rudder-server/utils/httputil"
)

var (
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestFlow(t *testing.T) {
if err != nil {
return err
}
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("status code not OK")
}
Expand Down
10 changes: 3 additions & 7 deletions regulation-worker/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
)

Expand Down Expand Up @@ -51,12 +52,7 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
if err != nil {
return model.Job{}, err
}
defer func() {
err := resp.Body.Close()
if err != nil {
pkgLogger.Errorf("error while closing response body: %v", err)
}
}()
defer func() { httputil.CloseResponse(resp) }()
pkgLogger.Debugf("obtained response code: %v", resp.StatusCode, "response body: ", resp.Body)

// if successful
Expand Down Expand Up @@ -140,7 +136,7 @@ func (j *JobAPI) UpdateStatus(ctx context.Context, status model.JobStatus, jobID
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
defer func() { httputil.CloseResponse(resp) }()

pkgLogger.Debugf("response code: %v", resp.StatusCode, "response body: %v", resp.Body)
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
Expand Down
3 changes: 2 additions & 1 deletion regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/oauth"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/logger"
)

Expand Down Expand Up @@ -95,7 +96,7 @@ func (api *APIManager) deleteWithRetry(ctx context.Context, job model.Job, desti
}
return model.JobStatusFailed
}
defer resp.Body.Close()
defer func() { httputil.CloseResponse(resp) }()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return model.JobStatusFailed
Expand Down
Loading

0 comments on commit 11d6123

Please sign in to comment.